[#233] services/object: Implement new Get algorithm

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-12-03 02:45:25 +03:00 committed by Alex Vanin
parent 26f03c6301
commit f24daa10ff
31 changed files with 2163 additions and 355 deletions

View file

@ -108,6 +108,7 @@ const (
cfgObjectRangeDialTimeout = "object.range.dial_timeout"
cfgObjectRangeHashDialTimeout = "object.rangehash.dial_timeout"
cfgObjectSearchDialTimeout = "object.search.dial_timeout"
cfgObjectGetDialTimeout = "object.get.dial_timeout"
)
const (

View file

@ -155,8 +155,8 @@ func (s *objectSvc) Search(ctx context.Context, req *object.SearchRequest) (obje
return s.search.Search(ctx, req)
}
func (s *objectSvc) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) {
return s.get.Get(ctx, req)
func (s *objectSvc) Get(req *object.GetRequest, stream objectService.GetObjectStream) error {
return s.get.Get(req, stream)
}
func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
@ -291,6 +291,8 @@ func initObjectService(c *cfg) {
}
})
traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapStorage, c.cfgObject.cnrStorage, c)
c.workers = append(c.workers, pol)
sPut := putsvc.NewService(
@ -372,12 +374,24 @@ func initObjectService(c *cfg) {
rangesvcV2.WithInternalService(sRange),
)
sGet := getsvc.NewService(
getsvc.WithRangeService(sRange),
sGet := getsvc.New(
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientCache(clientCache),
getsvc.WithHeadService(sHead),
getsvc.WithClientOptions(
client.WithDialTimeout(c.viper.GetDuration(cfgObjectGetDialTimeout)),
),
getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
)
sGetV2 := getsvcV2.NewService(
getsvcV2.WithInternalService(sGet),
getsvcV2.WithKeyStorage(keyStorage),
)
sRangeHash := rangehashsvc.NewService(

5
go.sum
View file

@ -12,6 +12,7 @@ cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbf
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/storage v1.0.0 h1:VV2nUM3wwLLGh9lSABFgZMjInyUbJeaRSE64WuAIQ+4=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 h1:/EMHruHCFXR9xClkGV/t0rmHrdhX4+trQUcBqjwc9xE=
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
@ -36,8 +37,10 @@ github.com/alecthomas/participle v0.6.0 h1:Pvo8XUCQKgIywVjz/+Ci3IsjGg+g/TdKkMcfg
github.com/alecthomas/participle v0.6.0/go.mod h1:HfdmEuwvr12HXQN44HPWXR0lHmVolVYe4dyL6lQ3duY=
github.com/alecthomas/repr v0.0.0-20181024024818-d37bc2a10ba1/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMwWcqkLzDAQugVEwedisr5nRJ1r+7LYnv0U=
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
@ -345,6 +348,7 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
@ -582,6 +586,7 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/abiosoft/ishell.v2 v2.0.0/go.mod h1:sFp+cGtH6o4s1FtpVPTMcHq2yue+c4DGOVohJCPUzwY=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View file

@ -0,0 +1,28 @@
package object
import (
"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
)
type getStreamerV2 struct {
objectGRPC.ObjectService_GetServer
}
func (s *getStreamerV2) Send(resp *object.GetResponse) error {
return s.ObjectService_GetServer.Send(
object.GetResponseToGRPCMessage(resp),
)
}
// Get converts gRPC GetRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) Get(req *objectGRPC.GetRequest, gStream objectGRPC.ObjectService_GetServer) error {
// TODO: think about how we transport errors through gRPC
return s.srv.Get(
object.GetRequestFromGRPCMessage(req),
&getStreamerV2{
ObjectService_GetServer: gStream,
},
)
}

View file

@ -6,47 +6,23 @@ import (
"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/pkg/errors"
)
// Server wraps NeoFS API Object service and
// provides gRPC Object service server interface.
type Server struct {
srv object.Service
srv objectSvc.ServiceServer
}
// New creates, initializes and returns Server instance.
func New(c object.Service) *Server {
func New(c objectSvc.ServiceServer) *Server {
return &Server{
srv: c,
}
}
// Get converts gRPC GetRequest message, opens internal Object service Get stream and overtakes its data
// to gRPC stream.
func (s *Server) Get(req *objectGRPC.GetRequest, gStream objectGRPC.ObjectService_GetServer) error {
stream, err := s.srv.Get(gStream.Context(), object.GetRequestFromGRPCMessage(req))
if err != nil {
// TODO: think about how we transport errors through gRPC
return err
}
for {
r, err := stream.Recv()
if err != nil {
if errors.Is(errors.Cause(err), io.EOF) {
return nil
}
return err
}
if err := gStream.Send(object.GetResponseToGRPCMessage(r)); err != nil {
return err
}
}
}
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
stream, err := s.srv.Put(gStream.Context())

View file

@ -18,6 +18,7 @@ import (
core "github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
eaclV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl/v2"
"github.com/pkg/errors"
@ -37,7 +38,8 @@ type (
}
getStreamBasicChecker struct {
next object.GetObjectStreamer
objectSvc.GetObjectStream
info requestInfo
*eACLCfg
@ -74,7 +76,7 @@ type cfg struct {
sender SenderClassifier
next object.Service
next objectSvc.ServiceServer
*eACLCfg
}
@ -123,12 +125,10 @@ func New(opts ...Option) Service {
}
}
func (b Service) Get(
ctx context.Context,
request *object.GetRequest) (object.GetObjectStreamer, error) {
func (b Service) Get(request *object.GetRequest, stream objectSvc.GetObjectStream) error {
cid, err := getContainerIDFromRequest(request)
if err != nil {
return nil, err
return err
}
req := metaWithToken{
@ -139,22 +139,20 @@ func (b Service) Get(
reqInfo, err := b.findRequestInfo(req, cid, acl.OperationGet)
if err != nil {
return nil, err
return err
}
if !basicACLCheck(reqInfo) {
return nil, basicACLErr(reqInfo)
return basicACLErr(reqInfo)
} else if !eACLCheck(request, reqInfo, b.eACLCfg) {
return nil, eACLErr(reqInfo)
return eACLErr(reqInfo)
}
stream, err := b.next.Get(ctx, request)
return getStreamBasicChecker{
next: stream,
info: reqInfo,
eACLCfg: b.eACLCfg,
}, err
return b.next.Get(request, &getStreamBasicChecker{
GetObjectStream: stream,
info: reqInfo,
eACLCfg: b.eACLCfg,
})
}
func (b Service) Put(ctx context.Context) (object.PutObjectStreamer, error) {
@ -366,32 +364,14 @@ func (p putStreamBasicChecker) CloseAndRecv() (*object.PutResponse, error) {
return p.next.CloseAndRecv()
}
func (g getStreamBasicChecker) Recv() (*object.GetResponse, error) {
resp, err := g.next.Recv()
if err != nil {
return resp, err
}
body := resp.GetBody()
if body == nil {
return resp, err
}
part := body.GetObjectPart()
if _, ok := part.(*object.GetObjectPartInit); ok {
ownerID, err := getObjectOwnerFromMessage(resp)
if err != nil {
return nil, err
}
if !stickyBitCheck(g.info, ownerID) {
return nil, basicACLErr(g.info)
} else if !eACLCheck(resp, g.info, g.eACLCfg) {
return nil, eACLErr(g.info)
func (g *getStreamBasicChecker) Send(resp *object.GetResponse) error {
if _, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartInit); ok {
if !eACLCheck(resp, g.info, g.eACLCfg) {
return eACLErr(g.info)
}
}
return resp, err
return g.GetObjectStream.Send(resp)
}
func (b Service) findRequestInfo(

View file

@ -1,10 +1,10 @@
package acl
import (
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
)
@ -23,7 +23,7 @@ func WithSenderClassifier(v SenderClassifier) Option {
}
// WithNextService returns option to set next object service.
func WithNextService(v object.Service) Option {
func WithNextService(v objectSvc.ServiceServer) Option {
return func(c *cfg) {
c.next = v
}

View file

@ -0,0 +1,130 @@
package getsvc
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"go.uber.org/zap"
)
func (exec *execCtx) assemble() {
if !exec.canAssemble() {
exec.log.Debug("can not assemble the object")
return
}
exec.log.Debug("trying to assemble the object...")
splitInfo := exec.splitInfo()
childID := splitInfo.Link()
if childID == nil {
childID = splitInfo.LastPart()
}
prev, children := exec.initFromChild(childID)
if len(children) > 0 {
if ok := exec.writeCollectedHeader(); ok {
exec.overtakePayloadDirectly(children)
}
} else if prev != nil {
if ok := exec.writeCollectedHeader(); ok {
// TODO: choose one-by-one restoring algorithm according to size
// * if size > MAX => go right-to-left with HEAD and back with GET
// * else go right-to-left with GET and compose in single object before writing
if ok := exec.overtakePayloadInReverse(prev); ok {
// payload of all children except the last are written, write last payload
exec.writeObjectPayload(exec.collectedObject)
}
}
} else {
exec.status = statusUndefined
exec.err = object.ErrNotFound
exec.log.Debug("could not init parent from child")
}
}
func (exec *execCtx) initFromChild(id *objectSDK.ID) (prev *objectSDK.ID, children []*objectSDK.ID) {
log := exec.log.With(zap.Stringer("child ID", id))
log.Debug("starting assembling from child")
child, ok := exec.getChild(id)
if !ok {
return
}
par := child.GetParent()
if par == nil {
exec.status = statusUndefined
log.Debug("received child with empty parent")
return
} else if !equalAddresses(par.Address(), exec.address()) {
exec.status = statusUndefined
log.Debug("parent address in child object differs",
zap.Stringer("expected", exec.address()),
zap.Stringer("received", par.Address()),
)
return
}
exec.collectedObject = par
object.NewRawFromObject(exec.collectedObject).SetPayload(child.Payload())
return child.PreviousID(), child.Children()
}
func (exec *execCtx) overtakePayloadDirectly(children []*objectSDK.ID) {
for i := range children {
child, ok := exec.getChild(children[i])
if !ok {
return
}
if ok := exec.writeObjectPayload(child); !ok {
return
}
}
exec.status = statusOK
exec.err = nil
}
func (exec *execCtx) overtakePayloadInReverse(prev *objectSDK.ID) bool {
chain := make([]*objectSDK.ID, 0)
// fill the chain end-to-start
for prev != nil {
head, ok := exec.headChild(prev)
if !ok {
return false
}
chain = append(chain, head.ID())
prev = head.PreviousID()
}
// reverse chain
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
chain[left], chain[right] = chain[right], chain[left]
}
exec.overtakePayloadDirectly(chain)
exec.status = statusOK
exec.err = nil
return true
}
func equalAddresses(a, b *objectSDK.Address) bool {
return a.ContainerID().Equal(b.ContainerID()) &&
a.ObjectID().Equal(b.ObjectID())
}

View file

@ -0,0 +1,54 @@
package getsvc
import (
"context"
"go.uber.org/zap"
)
func (exec *execCtx) executeOnContainer() {
if exec.isLocal() {
exec.log.Debug("return result directly")
return
}
exec.log.Debug("trying to execute in container...")
traverser, ok := exec.generateTraverser(exec.address())
if !ok {
return
}
ctx, cancel := context.WithCancel(exec.context())
defer cancel()
exec.status = statusUndefined
loop:
for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug("no more nodes, abort placement iteration")
break
}
for i := range addrs {
select {
case <-ctx.Done():
exec.log.Debug("interrupt placement iteration by context",
zap.String("error", ctx.Err().Error()),
)
break loop
default:
}
// TODO: consider parallel execution
// TODO: consider optimization: if status == SPLIT we can continue until
// we reach the best result - split info with linking object ID.
if exec.processNode(ctx, addrs[i]) {
exec.log.Debug("completing the operation")
break loop
}
}
}
}

View file

@ -0,0 +1,250 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
type statusError struct {
status int
err error
}
type execCtx struct {
svc *Service
ctx context.Context
prm Prm
statusError
infoSplit *objectSDK.SplitInfo
log *logger.Logger
collectedObject *object.Object
}
const (
statusUndefined int = iota
statusOK
statusINHUMED
statusVIRTUAL
)
func (exec *execCtx) setLogger(l *logger.Logger) {
exec.log = l.With(
zap.String("request", "GET"),
zap.Stringer("address", exec.address()),
zap.Bool("raw", exec.isRaw()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)
}
func (exec execCtx) context() context.Context {
return exec.ctx
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec execCtx) isRaw() bool {
return exec.prm.raw
}
func (exec execCtx) address() *objectSDK.Address {
return exec.prm.addr
}
func (exec execCtx) key() *ecdsa.PrivateKey {
return exec.prm.key
}
func (exec execCtx) callOptions() []client.CallOption {
return exec.prm.callOpts
}
func (exec execCtx) remotePrm() *client.GetObjectParams {
return new(client.GetObjectParams).
WithAddress(exec.prm.addr).
WithRawFlag(exec.prm.raw)
}
func (exec *execCtx) canAssemble() bool {
return exec.svc.assembly && !exec.isRaw()
}
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
return exec.infoSplit
}
func (exec *execCtx) containerID() *container.ID {
return exec.address().ContainerID()
}
func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not generate container traverser",
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
return t, true
}
}
func (exec *execCtx) getChild(id *objectSDK.ID) (*object.Object, bool) {
w := newSimpleObjectWriter()
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.SetObjectWriter(w)
addr := objectSDK.NewAddress()
addr.SetContainerID(exec.address().ContainerID())
addr.SetObjectID(id)
p.SetAddress(addr)
exec.statusError = exec.svc.get(exec.context(), p)
return w.object(), exec.status == statusOK
}
func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
childAddr := objectSDK.NewAddress()
childAddr.SetContainerID(exec.containerID())
childAddr.SetObjectID(id)
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.SetAddress(childAddr)
header, err := exec.svc.headSvc.head(exec.context(), p)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get child object header",
zap.Stringer("child ID", id),
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
exec.status = statusOK
exec.err = nil
return header, true
}
}
func (exec execCtx) remoteClient(node *network.Address) (getClient, bool) {
ipAddr, err := node.IPAddrString()
log := exec.log.With(zap.Stringer("node", node))
switch {
default:
exec.status = statusUndefined
exec.err = err
log.Debug("could not calculate node IP address")
case err == nil:
c, err := exec.svc.clientCache.get(exec.key(), ipAddr)
switch {
default:
exec.status = statusUndefined
exec.err = err
log.Debug("could not construct remote node client")
case err == nil:
return c, true
}
}
return nil, false
}
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
if last := src.LastPart(); last != nil {
dst.SetLastPart(last)
}
if link := src.Link(); link != nil {
dst.SetLink(link)
}
if splitID := src.SplitID(); splitID != nil {
dst.SetSplitID(splitID)
}
}
func (exec *execCtx) writeCollectedHeader() bool {
err := exec.prm.objWriter.WriteHeader(
object.NewRawFromObject(exec.collectedObject).CutPayload().Object(),
)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not write header",
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
return exec.status == statusOK
}
func (exec *execCtx) writeObjectPayload(obj *object.Object) bool {
err := exec.prm.objWriter.WriteChunk(obj.Payload())
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not write payload chunk",
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
return err == nil
}
func (exec *execCtx) writeCollectedObject() {
if ok := exec.writeCollectedHeader(); ok {
exec.writeObjectPayload(exec.collectedObject)
}
}

View file

@ -0,0 +1,59 @@
package getsvc
import (
"context"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"go.uber.org/zap"
)
// Get serves a request to get an object by address, and returns Streamer instance.
func (s *Service) Get(ctx context.Context, prm Prm) error {
return s.get(ctx, prm).err
}
func (s *Service) get(ctx context.Context, prm Prm) statusError {
exec := &execCtx{
svc: s,
ctx: ctx,
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
}
exec.setLogger(s.log)
exec.execute()
return exec.statusError
}
func (exec *execCtx) execute() {
exec.log.Debug("serving request...")
// perform local operation
exec.executeLocal()
exec.analyzeStatus(true)
}
func (exec *execCtx) analyzeStatus(execCnr bool) {
// analyze local result
switch exec.status {
case statusOK:
exec.log.Debug("operation finished successfully")
case statusINHUMED:
exec.log.Debug("requested object was marked as removed")
case statusVIRTUAL:
exec.log.Debug("requested object is virtual")
exec.assemble()
default:
exec.log.Debug("operation finished with error",
zap.String("error", exec.err.Error()),
)
if execCnr {
exec.executeOnContainer()
exec.analyzeStatus(false)
}
}
}

View file

@ -0,0 +1,847 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"crypto/sha256"
"fmt"
"strconv"
"testing"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)
type testStorage struct {
inhumed map[string]struct{}
virtual map[string]*objectSDK.SplitInfo
phy map[string]*object.Object
}
type testTraverserGenerator struct {
c *container.Container
b placement.Builder
}
type testPlacementBuilder struct {
vectors map[string][]netmap.Nodes
}
type testClientCache struct {
clients map[string]*testClient
}
type testClient struct {
results map[string]struct {
obj *object.RawObject
err error
}
}
func newTestStorage() *testStorage {
return &testStorage{
inhumed: make(map[string]struct{}),
virtual: make(map[string]*objectSDK.SplitInfo),
phy: make(map[string]*object.Object),
}
}
func (g *testTraverserGenerator) GenerateTraverser(addr *objectSDK.Address) (*placement.Traverser, error) {
return placement.NewTraverser(
placement.ForContainer(g.c),
placement.ForObject(addr.ObjectID()),
placement.UseBuilder(g.b),
placement.SuccessAfter(1),
)
}
func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap.PlacementPolicy) ([]netmap.Nodes, error) {
vs, ok := p.vectors[addr.String()]
if !ok {
return nil, errors.New("vectors for address not found")
}
return vs, nil
}
func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (getClient, error) {
v, ok := c.clients[addr]
if !ok {
return nil, errors.New("could not construct client")
}
return v, nil
}
func newTestClient() *testClient {
return &testClient{
results: map[string]struct {
obj *object.RawObject
err error
}{},
}
}
func (c *testClient) GetObject(_ context.Context, p Prm) (*objectSDK.Object, error) {
v, ok := c.results[p.addr.String()]
if !ok {
return nil, object.ErrNotFound
}
return v.obj.Object().SDK(), v.err
}
func (c *testClient) head(_ context.Context, p Prm) (*object.Object, error) {
v, ok := c.results[p.addr.String()]
if !ok {
return nil, object.ErrNotFound
}
if v.err != nil {
return nil, v.err
}
return v.obj.CutPayload().Object(), nil
}
func (c *testClient) addResult(addr *objectSDK.Address, obj *object.RawObject, err error) {
c.results[addr.String()] = struct {
obj *object.RawObject
err error
}{obj: obj, err: err}
}
func (s *testStorage) Get(addr *objectSDK.Address) (*object.Object, error) {
var (
ok bool
obj *object.Object
sAddr = addr.String()
)
if _, ok = s.inhumed[sAddr]; ok {
return nil, object.ErrAlreadyRemoved
}
if info, ok := s.virtual[sAddr]; ok {
return nil, objectSDK.NewSplitInfoError(info)
}
if obj, ok = s.phy[sAddr]; ok {
return obj, nil
}
return nil, object.ErrNotFound
}
func (s *testStorage) addPhy(addr *objectSDK.Address, obj *object.RawObject) {
s.phy[addr.String()] = obj.Object()
}
func (s *testStorage) addVirtual(addr *objectSDK.Address, info *objectSDK.SplitInfo) {
s.virtual[addr.String()] = info
}
func (s *testStorage) inhume(addr *objectSDK.Address) {
s.inhumed[addr.String()] = struct{}{}
}
func testSHA256() (cs [sha256.Size]byte) {
rand.Read(cs[:])
return cs
}
func generateID() *objectSDK.ID {
id := objectSDK.NewID()
id.SetSHA256(testSHA256())
return id
}
func generateAddress() *objectSDK.Address {
addr := objectSDK.NewAddress()
addr.SetObjectID(generateID())
cid := container.NewID()
cid.SetSHA256(testSHA256())
addr.SetContainerID(cid)
return addr
}
func generateObject(addr *objectSDK.Address, prev *objectSDK.ID, payload []byte, children ...*objectSDK.ID) *object.RawObject {
obj := object.NewRaw()
obj.SetContainerID(addr.ContainerID())
obj.SetID(addr.ObjectID())
obj.SetPayload(payload)
obj.SetPreviousID(prev)
obj.SetChildren(children...)
return obj
}
func TestGetLocalOnly(t *testing.T) {
ctx := context.Background()
newSvc := func(storage *testStorage) *Service {
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = storage
svc.assembly = true
return svc
}
newPrm := func(raw bool, w ObjectWriter) Prm {
return Prm{
objWriter: w,
raw: raw,
common: new(util.CommonPrm).WithLocalOnly(true),
}
}
t.Run("OK", func(t *testing.T) {
storage := newTestStorage()
svc := newSvc(storage)
w := newSimpleObjectWriter()
p := newPrm(false, w)
addr := generateAddress()
obj := generateObject(addr, nil, nil)
storage.addPhy(addr, obj)
p.addr = addr
storage.addPhy(addr, obj)
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj.Object(), w.object())
})
t.Run("INHUMED", func(t *testing.T) {
storage := newTestStorage()
svc := newSvc(storage)
p := newPrm(false, nil)
addr := generateAddress()
storage.inhume(addr)
p.addr = addr
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
})
t.Run("404", func(t *testing.T) {
storage := newTestStorage()
svc := newSvc(storage)
p := newPrm(false, nil)
addr := generateAddress()
p.addr = addr
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("VIRTUAL", func(t *testing.T) {
storage := newTestStorage()
svc := newSvc(storage)
p := newPrm(true, nil)
addr := generateAddress()
splitInfo := objectSDK.NewSplitInfo()
splitInfo.SetSplitID(objectSDK.NewSplitID())
splitInfo.SetLink(generateID())
splitInfo.SetLastPart(generateID())
p.addr = addr
storage.addVirtual(addr, splitInfo)
err := svc.Get(ctx, p)
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())
require.True(t, errors.As(err, &errSplit))
require.Equal(t, splitInfo, errSplit.SplitInfo())
})
}
func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) {
mNodes := make([]netmap.Nodes, len(dim))
mAddr := make([][]string, len(dim))
for i := range dim {
ns := make([]netmap.NodeInfo, dim[i])
as := make([]string, dim[i])
for j := 0; j < dim[i]; j++ {
a := fmt.Sprintf("/ip4/192.168.0.%s/tcp/%s",
strconv.Itoa(i),
strconv.Itoa(60000+j),
)
var err error
na, err := network.AddressFromString(a)
require.NoError(t, err)
as[j], err = na.IPAddrString()
require.NoError(t, err)
ni := netmap.NewNodeInfo()
ni.SetAddress(a)
ns[j] = *ni
}
mNodes[i] = netmap.NodesFromInfo(ns)
mAddr[i] = as
}
return mNodes, mAddr
}
func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK.ID, []byte) {
curID := generateID()
var prevID *objectSDK.ID
addr := objectSDK.NewAddress()
addr.SetContainerID(cid)
res := make([]*object.RawObject, 0, ln)
ids := make([]*objectSDK.ID, 0, ln)
payload := make([]byte, 0, ln*10)
for i := 0; i < ln; i++ {
ids = append(ids, curID)
addr.SetObjectID(curID)
payloadPart := make([]byte, 10)
rand.Read(payloadPart)
o := generateObject(addr, prevID, []byte{byte(i)})
o.SetPayload(payloadPart)
o.SetID(curID)
payload = append(payload, payloadPart...)
res = append(res, o)
prevID = curID
curID = generateID()
}
return res, ids, payload
}
func TestGetRemoteSmall(t *testing.T) {
ctx := context.Background()
cnr := container.New(container.WithPolicy(new(netmap.PlacementPolicy)))
cid := container.CalculateID(cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(true)
svc.localStorage = newTestStorage()
svc.assembly = true
svc.traverserGenerator = &testTraverserGenerator{
c: cnr,
b: b,
}
svc.clientCache = c
return svc
}
newPrm := func(raw bool, w ObjectWriter) Prm {
return Prm{
objWriter: w,
raw: raw,
common: new(util.CommonPrm).WithLocalOnly(false),
}
}
t.Run("OK", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
ns, as := testNodeMatrix(t, []int{2})
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
},
}
obj := generateObject(addr, nil, nil)
c1 := newTestClient()
c1.addResult(addr, obj, nil)
c2 := newTestClient()
c2.addResult(addr, nil, errors.New("any error"))
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
w := newSimpleObjectWriter()
p := newPrm(false, w)
p.addr = addr
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj.Object(), w.object())
*c1, *c2 = *c2, *c1
err = svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj.Object(), w.object())
})
t.Run("INHUMED", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
ns, as := testNodeMatrix(t, []int{2})
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
},
}
c1 := newTestClient()
c1.addResult(addr, nil, errors.New("any error"))
c2 := newTestClient()
c2.addResult(addr, nil, object.ErrAlreadyRemoved)
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
p := newPrm(false, nil)
p.addr = addr
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
})
t.Run("404", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
ns, as := testNodeMatrix(t, []int{2})
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
},
}
c1 := newTestClient()
c1.addResult(addr, nil, errors.New("any error"))
c2 := newTestClient()
c2.addResult(addr, nil, errors.New("any error"))
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
p := newPrm(false, nil)
p.addr = addr
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("VIRTUAL", func(t *testing.T) {
t.Run("linking", func(t *testing.T) {
t.Run("get linking failure", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
addr.SetObjectID(generateID())
ns, as := testNodeMatrix(t, []int{2})
splitInfo := objectSDK.NewSplitInfo()
splitInfo.SetLink(generateID())
splitAddr := objectSDK.NewAddress()
splitAddr.SetContainerID(cid)
splitAddr.SetObjectID(splitInfo.Link())
c1 := newTestClient()
c1.addResult(addr, nil, errors.New("any error"))
c1.addResult(splitAddr, nil, object.ErrNotFound)
c2 := newTestClient()
c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo))
c2.addResult(splitAddr, nil, object.ErrNotFound)
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
splitAddr.String(): ns,
},
}
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
p := newPrm(false, nil)
p.addr = addr
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("get chain element failure", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
addr.SetObjectID(generateID())
srcObj := generateObject(addr, nil, nil)
ns, as := testNodeMatrix(t, []int{2})
splitInfo := objectSDK.NewSplitInfo()
splitInfo.SetLink(generateID())
children, childIDs, _ := generateChain(2, cid)
linkAddr := objectSDK.NewAddress()
linkAddr.SetContainerID(cid)
linkAddr.SetObjectID(splitInfo.Link())
linkingObj := generateObject(linkAddr, nil, nil, childIDs...)
linkingObj.SetParentID(addr.ObjectID())
linkingObj.SetParent(srcObj.Object().SDK())
child1Addr := objectSDK.NewAddress()
child1Addr.SetContainerID(cid)
child1Addr.SetObjectID(childIDs[0])
child2Addr := objectSDK.NewAddress()
child2Addr.SetContainerID(cid)
child2Addr.SetObjectID(childIDs[1])
c1 := newTestClient()
c1.addResult(addr, nil, errors.New("any error"))
c1.addResult(linkAddr, nil, errors.New("any error"))
c1.addResult(child1Addr, nil, errors.New("any error"))
c1.addResult(child2Addr, nil, errors.New("any error"))
c2 := newTestClient()
c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo))
c2.addResult(linkAddr, linkingObj, nil)
c2.addResult(child1Addr, children[0], nil)
c2.addResult(child2Addr, nil, errors.New("any error"))
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
linkAddr.String(): ns,
child1Addr.String(): ns,
child2Addr.String(): ns,
},
}
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
p := newPrm(false, newSimpleObjectWriter())
p.addr = addr
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("OK", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
addr.SetObjectID(generateID())
srcObj := generateObject(addr, nil, nil)
ns, as := testNodeMatrix(t, []int{2})
splitInfo := objectSDK.NewSplitInfo()
splitInfo.SetLink(generateID())
children, childIDs, payload := generateChain(2, cid)
srcObj.SetPayload(payload)
linkAddr := objectSDK.NewAddress()
linkAddr.SetContainerID(cid)
linkAddr.SetObjectID(splitInfo.Link())
linkingObj := generateObject(linkAddr, nil, nil, childIDs...)
linkingObj.SetParentID(addr.ObjectID())
linkingObj.SetParent(srcObj.Object().SDK())
child1Addr := objectSDK.NewAddress()
child1Addr.SetContainerID(cid)
child1Addr.SetObjectID(childIDs[0])
child2Addr := objectSDK.NewAddress()
child2Addr.SetContainerID(cid)
child2Addr.SetObjectID(childIDs[1])
c1 := newTestClient()
c1.addResult(addr, nil, errors.New("any error"))
c1.addResult(linkAddr, nil, errors.New("any error"))
c1.addResult(child1Addr, nil, errors.New("any error"))
c1.addResult(child2Addr, nil, errors.New("any error"))
c2 := newTestClient()
c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo))
c2.addResult(linkAddr, linkingObj, nil)
c2.addResult(child1Addr, children[0], nil)
c2.addResult(child2Addr, children[1], nil)
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
linkAddr.String(): ns,
child1Addr.String(): ns,
child2Addr.String(): ns,
},
}
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
w := newSimpleObjectWriter()
p := newPrm(false, w)
p.addr = addr
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, srcObj.Object(), w.object())
})
})
t.Run("right child", func(t *testing.T) {
t.Run("get right child failure", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
addr.SetObjectID(generateID())
ns, as := testNodeMatrix(t, []int{2})
splitInfo := objectSDK.NewSplitInfo()
splitInfo.SetLastPart(generateID())
splitAddr := objectSDK.NewAddress()
splitAddr.SetContainerID(cid)
splitAddr.SetObjectID(splitInfo.LastPart())
c1 := newTestClient()
c1.addResult(addr, nil, errors.New("any error"))
c1.addResult(splitAddr, nil, object.ErrNotFound)
c2 := newTestClient()
c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo))
c2.addResult(splitAddr, nil, object.ErrNotFound)
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
splitAddr.String(): ns,
},
}
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
p := newPrm(false, nil)
p.addr = addr
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("get chain element failure", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
addr.SetObjectID(generateID())
srcObj := generateObject(addr, nil, nil)
ns, as := testNodeMatrix(t, []int{2})
splitInfo := objectSDK.NewSplitInfo()
splitInfo.SetLastPart(generateID())
children, _, _ := generateChain(2, cid)
rightAddr := objectSDK.NewAddress()
rightAddr.SetContainerID(cid)
rightAddr.SetObjectID(splitInfo.LastPart())
rightObj := children[len(children)-1]
rightObj.SetParentID(addr.ObjectID())
rightObj.SetParent(srcObj.Object().SDK())
preRightAddr := children[len(children)-2].Object().Address()
c1 := newTestClient()
c1.addResult(addr, nil, errors.New("any error"))
c1.addResult(rightAddr, nil, errors.New("any error"))
c2 := newTestClient()
c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo))
c2.addResult(rightAddr, rightObj, nil)
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns,
rightAddr.String(): ns,
preRightAddr.String(): ns,
preRightAddr.String(): ns,
},
}
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
headSvc := newTestClient()
headSvc.addResult(preRightAddr, nil, object.ErrNotFound)
svc.headSvc = headSvc
p := newPrm(false, newSimpleObjectWriter())
p.addr = addr
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("OK", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
addr.SetObjectID(generateID())
srcObj := generateObject(addr, nil, nil)
ns, as := testNodeMatrix(t, []int{2})
splitInfo := objectSDK.NewSplitInfo()
splitInfo.SetLastPart(generateID())
children, _, payload := generateChain(2, cid)
srcObj.SetPayload(payload)
rightObj := children[len(children)-1]
rightObj.SetID(splitInfo.LastPart())
rightObj.SetParentID(addr.ObjectID())
rightObj.SetParent(srcObj.Object().SDK())
c1 := newTestClient()
c1.addResult(addr, nil, errors.New("any error"))
for i := range children {
c1.addResult(children[i].Object().Address(), nil, errors.New("any error"))
}
c2 := newTestClient()
c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo))
for i := range children {
c2.addResult(children[i].Object().Address(), children[i], nil)
}
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{},
}
builder.vectors[addr.String()] = ns
for i := range children {
builder.vectors[children[i].Object().Address().String()] = ns
}
svc := newSvc(builder, &testClientCache{
clients: map[string]*testClient{
as[0][0]: c1,
as[0][1]: c2,
},
})
svc.headSvc = c2
w := newSimpleObjectWriter()
p := newPrm(false, w)
p.addr = addr
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, srcObj.Object(), w.object())
})
})
})
}

View file

@ -0,0 +1,37 @@
package getsvc
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/pkg/errors"
"go.uber.org/zap"
)
func (exec *execCtx) executeLocal() {
var err error
exec.collectedObject, err = exec.svc.localStorage.Get(exec.address())
var errSplitInfo *objectSDK.SplitInfoError
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("local get failed",
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
exec.writeCollectedObject()
case errors.Is(err, object.ErrAlreadyRemoved):
exec.status = statusINHUMED
exec.err = object.ErrAlreadyRemoved
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
}
}

View file

@ -1,30 +1,59 @@
package getsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
)
// Prm groups parameters of Get service call.
type Prm struct {
objWriter ObjectWriter
// TODO: replace key and callOpts to CommonPrm
key *ecdsa.PrivateKey
callOpts []client.CallOption
common *util.CommonPrm
full bool
// TODO: use parameters from NeoFS SDK
addr *objectSDK.Address
addr *object.Address
raw bool
}
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
if p != nil {
p.common = v
}
return p
// ObjectWriter is an interface of target component to write object.
type ObjectWriter interface {
WriteHeader(*object.Object) error
WriteChunk([]byte) error
}
func (p *Prm) WithAddress(v *object.Address) *Prm {
if p != nil {
p.addr = v
}
return p
// SetObjectWriter sets target component to write the object.
func (p *Prm) SetObjectWriter(w ObjectWriter) {
p.objWriter = w
}
// SetPrivateKey sets private key to use during execution.
func (p *Prm) SetPrivateKey(key *ecdsa.PrivateKey) {
p.key = key
}
// SetRemoteCallOptions sets call options remote remote client calls.
func (p *Prm) SetRemoteCallOptions(opts ...client.CallOption) {
p.callOpts = opts
}
// SetAddress sets address of the requested object.
func (p *Prm) SetAddress(addr *objectSDK.Address) {
p.addr = addr
}
// SetRaw sets raw flag. If flag is set,
// object assembling will not be undertaken.
func (p *Prm) SetRaw(raw bool) {
p.raw = raw
}

View file

@ -0,0 +1,50 @@
package getsvc
import (
"context"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/pkg/errors"
"go.uber.org/zap"
)
func (exec *execCtx) processNode(ctx context.Context, addr *network.Address) bool {
log := exec.log.With(zap.Stringer("remote node", addr))
log.Debug("processing node...")
client, ok := exec.remoteClient(addr)
if !ok {
return true
}
obj, err := client.GetObject(ctx, exec.prm)
var errSplitInfo *objectSDK.SplitInfoError
switch {
default:
exec.status = statusUndefined
exec.err = object.ErrNotFound
log.Debug("remote call failed",
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
exec.collectedObject = object.NewFromSDK(obj)
exec.writeCollectedObject()
case errors.Is(err, object.ErrAlreadyRemoved):
exec.status = statusINHUMED
exec.err = object.ErrAlreadyRemoved
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
}
return exec.status != statusUndefined
}

View file

@ -2,26 +2,67 @@ package getsvc
import (
"context"
"crypto/ecdsa"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
"github.com/pkg/errors"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Service utility serving requests of Object.Get service.
type Service struct {
*cfg
}
// Option is a Service's constructor option.
type Option func(*cfg)
type getClient interface {
GetObject(context.Context, Prm) (*objectSDK.Object, error)
}
type cfg struct {
rngSvc *rangesvc.Service
assembly bool
log *logger.Logger
headSvc interface {
head(context.Context, Prm) (*object.Object, error)
}
localStorage interface {
Get(*objectSDK.Address) (*object.Object, error)
}
clientCache interface {
get(*ecdsa.PrivateKey, string) (getClient, error)
}
traverserGenerator interface {
GenerateTraverser(*objectSDK.Address) (*placement.Traverser, error)
}
}
func defaultCfg() *cfg {
return new(cfg)
return &cfg{
assembly: true,
log: zap.L(),
headSvc: new(headSvcWrapper),
localStorage: new(storageEngineWrapper),
clientCache: new(clientCacheWrapper),
}
}
func NewService(opts ...Option) *Service {
// New creates, initializes and returns utility serving
// Object.Get service requests.
func New(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
@ -33,23 +74,53 @@ func NewService(opts ...Option) *Service {
}
}
func (s *Service) Get(ctx context.Context, prm *Prm) (*Streamer, error) {
r, err := s.rngSvc.GetRange(ctx, new(rangesvc.Prm).
WithAddress(prm.addr).
FullRange().
WithCommonPrm(prm.common),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not get range", s)
}
return &Streamer{
rngRes: r,
}, nil
}
func WithRangeService(v *rangesvc.Service) Option {
// WithLogger returns option to specify Get service's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.rngSvc = v
c.log = l.With(zap.String("component", "Object.Get service"))
}
}
// WithoutAssembly returns option to disable object assembling.
func WithoutAssembly() Option {
return func(c *cfg) {
c.assembly = false
}
}
// WithLocalStorageEngine returns option to set local storage
// instance.
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
return func(c *cfg) {
c.localStorage.(*storageEngineWrapper).engine = e
}
}
// WithClientCache returns option to set cache of remote node clients.
func WithClientCache(v *cache.ClientCache) Option {
return func(c *cfg) {
c.clientCache.(*clientCacheWrapper).cache = v
}
}
// WithClientOptions returns option to specify options of remote node clients.
func WithClientOptions(opts ...client.Option) Option {
return func(c *cfg) {
c.clientCache.(*clientCacheWrapper).opts = opts
}
}
// WithTraverserGenerator returns option to set generator of
// placement traverser to get the objects from containers.
func WithTraverserGenerator(t *util.TraverserGenerator) Option {
return func(c *cfg) {
c.traverserGenerator = t
}
}
// WithHeadService returns option to set the utility serving object.Head.
func WithHeadService(svc *headsvc.Service) Option {
return func(c *cfg) {
c.headSvc.(*headSvcWrapper).svc = svc
}
}

View file

@ -1,26 +0,0 @@
package getsvc
import (
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
"github.com/pkg/errors"
)
type Streamer struct {
headSent bool
rngRes *rangesvc.Result
}
func (p *Streamer) Recv() (interface{}, error) {
if !p.headSent {
p.headSent = true
return p.rngRes.Head(), nil
}
rngResp, err := p.rngRes.Stream().Recv()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive range response", p)
}
return rngResp.PayloadChunk(), nil
}

View file

@ -0,0 +1,110 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
)
type simpleObjectWriter struct {
obj *object.RawObject
payload []byte
}
type clientCacheWrapper struct {
cache *cache.ClientCache
opts []client.Option
}
type clientWrapper struct {
client *client.Client
}
type storageEngineWrapper struct {
engine *engine.StorageEngine
}
type headSvcWrapper struct {
svc *headsvc.Service
}
func newSimpleObjectWriter() *simpleObjectWriter {
return new(simpleObjectWriter)
}
func (s *simpleObjectWriter) WriteHeader(obj *object.Object) error {
s.obj = object.NewRawFromObject(obj)
s.payload = make([]byte, 0, obj.PayloadSize())
return nil
}
func (s *simpleObjectWriter) WriteChunk(p []byte) error {
s.payload = append(s.payload, p...)
return nil
}
func (s *simpleObjectWriter) Close() error {
return nil
}
func (s *simpleObjectWriter) object() *object.Object {
if len(s.payload) > 0 {
s.obj.SetPayload(s.payload)
}
return s.obj.Object()
}
func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (getClient, error) {
clt, err := c.cache.Get(key, addr, c.opts...)
return &clientWrapper{
client: clt,
}, err
}
func (c *clientWrapper) GetObject(ctx context.Context, p Prm) (*objectSDK.Object, error) {
// we don't specify payload writer because we accumulate
// the object locally (even huge).
return c.client.GetObject(ctx,
new(client.GetObjectParams).
WithAddress(p.addr).
WithRawFlag(true),
p.callOpts...,
)
}
func (e *storageEngineWrapper) Get(addr *objectSDK.Address) (*object.Object, error) {
r, err := e.engine.Get(new(engine.GetPrm).
WithAddress(addr),
)
if err != nil {
return nil, err
}
return r.Object(), nil
}
func (s *headSvcWrapper) head(ctx context.Context, p Prm) (*object.Object, error) {
r, err := s.svc.Head(ctx, new(headsvc.Prm).
WithAddress(p.addr).
WithCommonPrm(p.common).
Short(false),
)
if err != nil {
return nil, err
}
return r.Header(), nil
}

View file

@ -1,10 +1,11 @@
package getsvc
import (
"context"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/pkg/errors"
)
@ -18,6 +19,8 @@ type Option func(*cfg)
type cfg struct {
svc *getsvc.Service
keyStorage *objutil.KeyStorage
}
// NewService constructs Service instance from provided options.
@ -34,13 +37,22 @@ func NewService(opts ...Option) *Service {
}
// Get calls internal service and returns v2 object stream.
func (s *Service) Get(ctx context.Context, req *objectV2.GetRequest) (objectV2.GetObjectStreamer, error) {
stream, err := s.svc.Get(ctx, toPrm(req))
func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) error {
p, err := s.toPrm(req, stream)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not get object payload range data", s)
return err
}
return fromResponse(stream), nil
err = s.svc.Get(stream.Context(), *p)
var splitErr *object.SplitInfoError
switch {
case errors.As(err, &splitErr):
return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
default:
return err
}
}
func WithInternalService(v *getsvc.Service) Option {
@ -48,3 +60,10 @@ func WithInternalService(v *getsvc.Service) Option {
c.svc = v
}
}
// WithKeyStorage returns option to set local private key storage.
func WithKeyStorage(ks *objutil.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = ks
}
}

View file

@ -1,47 +1,40 @@
package getsvc
import (
"fmt"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/pkg/errors"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
)
type streamer struct {
stream *getsvc.Streamer
body *objectV2.GetResponseBody
type streamObjectWriter struct {
objectSvc.GetObjectStream
}
func (s *streamer) Recv() (*objectV2.GetResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive get response", s)
}
func (s *streamObjectWriter) WriteHeader(obj *object.Object) error {
p := new(objectV2.GetObjectPartInit)
switch v := r.(type) {
case *object.Object:
oV2 := v.ToV2()
objV2 := obj.ToV2()
p.SetObjectID(objV2.GetObjectID())
p.SetHeader(objV2.GetHeader())
p.SetSignature(objV2.GetSignature())
partInit := new(objectV2.GetObjectPartInit)
partInit.SetHeader(oV2.GetHeader())
partInit.SetSignature(oV2.GetSignature())
partInit.SetObjectID(oV2.GetObjectID())
s.body.SetObjectPart(partInit)
case []byte:
partChunk := new(objectV2.GetObjectPartChunk)
partChunk.SetChunk(v)
s.body.SetObjectPart(partChunk)
default:
panic(fmt.Sprintf("unexpected response type %T from %T", r, s.stream))
}
resp := new(objectV2.GetResponse)
resp.SetBody(s.body)
return resp, nil
return s.GetObjectStream.Send(newResponse(p))
}
func (s *streamObjectWriter) WriteChunk(chunk []byte) error {
p := new(objectV2.GetObjectPartChunk)
p.SetChunk(chunk)
return s.GetObjectStream.Send(newResponse(p))
}
func newResponse(p objectV2.GetObjectPart) *objectV2.GetResponse {
r := new(objectV2.GetResponse)
body := new(objectV2.GetResponseBody)
r.SetBody(body)
body.SetObjectPart(p)
return r
}

View file

@ -1,23 +1,62 @@
package getsvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/session"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
)
func toPrm(req *objectV2.GetRequest) *getsvc.Prm {
return new(getsvc.Prm).
WithAddress(
object.NewAddressFromV2(req.GetBody().GetAddress()),
).
WithCommonPrm(util.CommonPrmFromV2(req))
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
meta := req.GetMetaHeader()
key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken()))
if err != nil {
return nil, err
}
p := new(getsvc.Prm)
p.SetPrivateKey(key)
body := req.GetBody()
p.SetAddress(object.NewAddressFromV2(body.GetAddress()))
p.SetRaw(body.GetRaw())
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
p.SetObjectWriter(&streamObjectWriter{stream})
return p, nil
}
func fromResponse(res *getsvc.Streamer) objectV2.GetObjectStreamer {
return &streamer{
stream: res,
body: new(objectV2.GetResponseBody),
// can be shared accross all services
func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption {
xHdrs := meta.GetXHeaders()
opts := make([]client.CallOption, 0, 3+len(xHdrs))
opts = append(opts,
client.WithBearer(token.NewBearerTokenFromV2(meta.GetBearerToken())),
client.WithSession(token.NewSessionTokenFromV2(meta.GetSessionToken())),
client.WithTTL(meta.GetTTL()-1),
)
for i := range xHdrs {
opts = append(opts, client.WithXHeader(pkg.NewXHeaderFromV2(xHdrs[i])))
}
return opts
}
func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse {
resp := new(objectV2.GetResponse)
body := new(objectV2.GetResponseBody)
resp.SetBody(body)
body.SetObjectPart(info.ToV2())
return resp
}

View file

@ -9,10 +9,10 @@ import (
"github.com/pkg/errors"
)
type responseService struct {
type ResponseService struct {
respSvc *response.Service
svc object.Service
svc ServiceServer
}
type searchStreamResponser struct {
@ -20,7 +20,9 @@ type searchStreamResponser struct {
}
type getStreamResponser struct {
stream *response.ServerMessageStreamer
util.ServerStream
respWriter util.ResponseMessageWriter
}
type getRangeStreamResponser struct {
@ -33,42 +35,24 @@ type putStreamResponser struct {
// NewResponseService returns object service instance that passes internal service
// call to response service.
func NewResponseService(objSvc object.Service, respSvc *response.Service) object.Service {
return &responseService{
func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService {
return &ResponseService{
respSvc: respSvc,
svc: objSvc,
}
}
func (s *getStreamResponser) Recv() (*object.GetResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive response", s)
}
return r.(*object.GetResponse), nil
func (s *getStreamResponser) Send(resp *object.GetResponse) error {
return s.respWriter(resp)
}
func (s *responseService) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) {
stream, err := s.respSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.Get(ctx, req.(*object.GetRequest))
if err != nil {
return nil, err
}
return func() (util.ResponseMessage, error) {
return stream.Recv()
}, nil
},
)
if err != nil {
return nil, err
}
return &getStreamResponser{
stream: stream,
}, nil
func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) error {
return s.svc.Get(req, &getStreamResponser{
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest_(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetResponse))
}),
})
}
func (s *putStreamResponser) Send(req *object.PutRequest) error {
@ -84,7 +68,7 @@ func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
return r.(*object.PutResponse), nil
}
func (s *responseService) Put(ctx context.Context) (object.PutObjectStreamer, error) {
func (s *ResponseService) Put(ctx context.Context) (object.PutObjectStreamer, error) {
stream, err := s.svc.Put(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not create Put object streamer")
@ -102,7 +86,7 @@ func (s *responseService) Put(ctx context.Context) (object.PutObjectStreamer, er
}, nil
}
func (s *responseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
return s.svc.Head(ctx, req.(*object.HeadRequest))
@ -124,7 +108,7 @@ func (s *searchStreamResponser) Recv() (*object.SearchResponse, error) {
return r.(*object.SearchResponse), nil
}
func (s *responseService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
func (s *ResponseService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
stream, err := s.respSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.Search(ctx, req.(*object.SearchRequest))
@ -146,7 +130,7 @@ func (s *responseService) Search(ctx context.Context, req *object.SearchRequest)
}, nil
}
func (s *responseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
@ -168,7 +152,7 @@ func (s *getRangeStreamResponser) Recv() (*object.GetRangeResponse, error) {
return r.(*object.GetRangeResponse), nil
}
func (s *responseService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
func (s *ResponseService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
stream, err := s.respSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest))
@ -190,7 +174,7 @@ func (s *responseService) GetRange(ctx context.Context, req *object.GetRangeRequ
}, nil
}
func (s *responseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))

View file

@ -0,0 +1,26 @@
package object
import (
"context"
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-node/pkg/services/util"
)
// GetObjectStream is an interface of NeoFS API v2 compatible object streamer.
type GetObjectStream interface {
util.ServerStream
Send(*object.GetResponse) error
}
// ServiceServer is an interface of utility
// serving v2 Object service.
type ServiceServer interface {
Get(*object.GetRequest, GetObjectStream) error
Put(context.Context) (object.PutObjectStreamer, error)
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
Search(context.Context, *object.SearchRequest) (object.SearchObjectStreamer, error)
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
GetRange(context.Context, *object.GetRangeRequest) (object.GetRangeObjectStreamer, error)
GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error)
}

View file

@ -9,12 +9,12 @@ import (
"github.com/pkg/errors"
)
type signService struct {
type SignService struct {
key *ecdsa.PrivateKey
sigSvc *util.SignService
svc object.Service
svc ServiceServer
}
type searchStreamSigner struct {
@ -22,7 +22,9 @@ type searchStreamSigner struct {
}
type getStreamSigner struct {
stream *util.ResponseMessageStreamer
util.ServerStream
respWriter util.ResponseMessageWriter
}
type putStreamSigner struct {
@ -33,43 +35,32 @@ type getRangeStreamSigner struct {
stream *util.ResponseMessageStreamer
}
func NewSignService(key *ecdsa.PrivateKey, svc object.Service) object.Service {
return &signService{
func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
return &SignService{
key: key,
sigSvc: util.NewUnarySignService(key),
svc: svc,
}
}
func (s *getStreamSigner) Recv() (*object.GetResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrap(err, "could not receive response")
}
return r.(*object.GetResponse), nil
func (s *getStreamSigner) Send(resp *object.GetResponse) error {
return s.respWriter(resp)
}
func (s *signService) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) {
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.Get(ctx, req.(*object.GetRequest))
if err != nil {
return nil, err
}
return func() (util.ResponseMessage, error) {
return stream.Recv()
}, nil
func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error {
respWriter, err := s.sigSvc.HandleServerStreamRequest_(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetResponse))
},
)
if err != nil {
return nil, err
return err
}
return &getStreamSigner{
stream: stream,
}, nil
return s.svc.Get(req, &getStreamSigner{
ServerStream: stream,
respWriter: respWriter,
})
}
func (s *putStreamSigner) Send(req *object.PutRequest) error {
@ -85,7 +76,7 @@ func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
return r.(*object.PutResponse), nil
}
func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error) {
func (s *SignService) Put(ctx context.Context) (object.PutObjectStreamer, error) {
stream, err := s.svc.Put(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not create Put object streamer")
@ -103,7 +94,7 @@ func (s *signService) Put(ctx context.Context) (object.PutObjectStreamer, error)
}, nil
}
func (s *signService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
return s.svc.Head(ctx, req.(*object.HeadRequest))
@ -125,7 +116,7 @@ func (s *searchStreamSigner) Recv() (*object.SearchResponse, error) {
return r.(*object.SearchResponse), nil
}
func (s *signService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
func (s *SignService) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.Search(ctx, req.(*object.SearchRequest))
@ -147,7 +138,7 @@ func (s *signService) Search(ctx context.Context, req *object.SearchRequest) (ob
}, nil
}
func (s *signService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
return s.svc.Delete(ctx, req.(*object.DeleteRequest))
@ -169,7 +160,7 @@ func (s *getRangeStreamSigner) Recv() (*object.GetRangeResponse, error) {
return r.(*object.GetRangeResponse), nil
}
func (s *signService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
func (s *SignService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest))
@ -191,7 +182,7 @@ func (s *signService) GetRange(ctx context.Context, req *object.GetRangeRequest)
}, nil
}
func (s *signService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessage, error) {
return s.svc.GetRangeHash(ctx, req.(*object.GetRangeHashRequest))

View file

@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-node/pkg/services/util"
"github.com/pkg/errors"
)
@ -15,16 +16,17 @@ var (
type (
TransportSplitter struct {
next object.Service
next ServiceServer
chunkSize uint64
addrAmount uint64
}
getStreamBasicChecker struct {
next object.GetObjectStreamer
buf *bytes.Buffer
resp *object.GetResponse
getStreamMsgSizeCtrl struct {
util.ServerStream
stream GetObjectStream
chunkSize int
}
@ -43,7 +45,37 @@ type (
}
)
func NewTransportSplitter(size, amount uint64, next object.Service) *TransportSplitter {
func (s *getStreamMsgSizeCtrl) Send(resp *object.GetResponse) error {
body := resp.GetBody()
part := body.GetObjectPart()
chunkPart, ok := part.(*object.GetObjectPartChunk)
if !ok {
return s.stream.Send(resp)
}
var newResp *object.GetResponse
for buf := bytes.NewBuffer(chunkPart.GetChunk()); buf.Len() > 0; {
if newResp == nil {
newResp = new(object.GetResponse)
newResp.SetBody(body)
}
chunkPart.SetChunk(buf.Next(s.chunkSize))
newResp.SetMetaHeader(resp.GetMetaHeader())
newResp.SetVerificationHeader(resp.GetVerificationHeader())
if err := s.stream.Send(newResp); err != nil {
return err
}
}
return nil
}
func NewTransportSplitter(size, amount uint64, next ServiceServer) *TransportSplitter {
return &TransportSplitter{
next: next,
chunkSize: size,
@ -51,13 +83,12 @@ func NewTransportSplitter(size, amount uint64, next object.Service) *TransportSp
}
}
func (c TransportSplitter) Get(ctx context.Context, request *object.GetRequest) (object.GetObjectStreamer, error) {
stream, err := c.next.Get(ctx, request)
return &getStreamBasicChecker{
next: stream,
chunkSize: int(c.chunkSize),
}, err
func (c *TransportSplitter) Get(req *object.GetRequest, stream GetObjectStream) error {
return c.next.Get(req, &getStreamMsgSizeCtrl{
ServerStream: stream,
stream: stream,
chunkSize: int(c.chunkSize),
})
}
func (c TransportSplitter) Put(ctx context.Context) (object.PutObjectStreamer, error) {
@ -94,40 +125,6 @@ func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.Get
return c.next.GetRangeHash(ctx, request)
}
func (g *getStreamBasicChecker) Recv() (*object.GetResponse, error) {
if g.resp == nil {
resp, err := g.next.Recv()
if err != nil {
return resp, err
}
if part, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk); !ok {
return resp, err
} else {
g.resp = resp
g.buf = bytes.NewBuffer(part.GetChunk())
}
}
chunkBody := new(object.GetObjectPartChunk)
chunkBody.SetChunk(g.buf.Next(g.chunkSize))
body := new(object.GetResponseBody)
body.SetObjectPart(chunkBody)
resp := new(object.GetResponse)
resp.SetVerificationHeader(g.resp.GetVerificationHeader())
resp.SetMetaHeader(g.resp.GetMetaHeader())
resp.SetBody(body)
if g.buf.Len() == 0 {
g.buf = nil
g.resp = nil
}
return resp, nil
}
func (r *rangeStreamBasicChecker) Recv() (*object.GetRangeResponse, error) {
if r.resp == nil {
resp, err := r.next.Recv()

View file

@ -5,7 +5,6 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
"github.com/pkg/errors"
)
// KeyStorage represents private key storage of the local node.
@ -30,11 +29,9 @@ func NewKeyStorage(localKey *ecdsa.PrivateKey, tokenStore *storage.TokenStore) *
func (s *KeyStorage) GetKey(token *token.SessionToken) (*ecdsa.PrivateKey, error) {
if token != nil {
pToken := s.tokenStore.Get(token.OwnerID(), token.ID())
if pToken == nil {
return nil, errors.Wrapf(storage.ErrNotFound, "(%T) could not get session key", s)
if pToken != nil {
return pToken.SessionKey(), nil
}
return pToken.SessionKey(), nil
}
return s.key, nil

View file

@ -1,45 +0,0 @@
package util
import (
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type localPlacement struct {
builder placement.Builder
localAddrSrc network.LocalAddressSource
}
func NewLocalPlacement(b placement.Builder, s network.LocalAddressSource) placement.Builder {
return &localPlacement{
builder: b,
localAddrSrc: s,
}
}
func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmap.PlacementPolicy) ([]netmap.Nodes, error) {
vs, err := p.builder.BuildPlacement(addr, policy)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not build object placement", p)
}
for i := range vs {
for j := range vs[i] {
addr, err := network.AddressFromString(vs[i][j].Address())
if err != nil {
// TODO: log error
continue
}
if network.IsLocalAddress(p.localAddrSrc, addr) {
return []netmap.Nodes{{vs[i][j]}}, nil
}
}
}
return nil, errors.Errorf("(%T) local node is outside of object placement", p)
}

View file

@ -0,0 +1,155 @@
package util
import (
netmapSDK "github.com/nspcc-dev/neofs-api-go/pkg/netmap"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type localPlacement struct {
builder placement.Builder
localAddrSrc network.LocalAddressSource
}
type remotePlacement struct {
builder placement.Builder
localAddrSrc network.LocalAddressSource
}
// TraverserGenerator represents tool that generates
// container traverser for the particular need.
type TraverserGenerator struct {
netMapSrc netmap.Source
cnrSrc container.Source
localAddrSrc network.LocalAddressSource
customOpts []placement.Option
}
func NewLocalPlacement(b placement.Builder, s network.LocalAddressSource) placement.Builder {
return &localPlacement{
builder: b,
localAddrSrc: s,
}
}
func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.PlacementPolicy) ([]netmapSDK.Nodes, error) {
vs, err := p.builder.BuildPlacement(addr, policy)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not build object placement", p)
}
for i := range vs {
for j := range vs[i] {
addr, err := network.AddressFromString(vs[i][j].Address())
if err != nil {
// TODO: log error
continue
}
if network.IsLocalAddress(p.localAddrSrc, addr) {
return []netmapSDK.Nodes{{vs[i][j]}}, nil
}
}
}
return nil, errors.Errorf("(%T) local node is outside of object placement", p)
}
// NewRemotePlacementBuilder creates, initializes and returns placement builder that
// excludes local node from any placement vector.
func NewRemotePlacementBuilder(b placement.Builder, s network.LocalAddressSource) placement.Builder {
return &remotePlacement{
builder: b,
localAddrSrc: s,
}
}
func (p *remotePlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.PlacementPolicy) ([]netmapSDK.Nodes, error) {
vs, err := p.builder.BuildPlacement(addr, policy)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not build object placement", p)
}
for i := range vs {
for j := 0; j < len(vs[i]); j++ {
addr, err := network.AddressFromString(vs[i][j].Address())
if err != nil {
// TODO: log error
continue
}
if network.IsLocalAddress(p.localAddrSrc, addr) {
vs[i] = append(vs[i][:j], vs[i][j+1:]...)
j--
}
}
}
return vs, nil
}
// NewTraverserGenerator creates, initializes and returns new TraverserGenerator instance.
func NewTraverserGenerator(nmSrc netmap.Source, cnrSrc container.Source, localAddrSrc network.LocalAddressSource) *TraverserGenerator {
return &TraverserGenerator{
netMapSrc: nmSrc,
cnrSrc: cnrSrc,
localAddrSrc: localAddrSrc,
}
}
// WithTraverseOptions returns TraverseGenerator that additionally applies provided options.
func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *TraverserGenerator {
return &TraverserGenerator{
netMapSrc: g.netMapSrc,
cnrSrc: g.cnrSrc,
localAddrSrc: g.localAddrSrc,
customOpts: opts,
}
}
// GenerateTraverser generates placement Traverser for provided object address.
func (g *TraverserGenerator) GenerateTraverser(addr *object.Address) (*placement.Traverser, error) {
// get latest network map
nm, err := netmap.GetLatestNetworkMap(g.netMapSrc)
if err != nil {
return nil, errors.Wrapf(err, "could not get latest network map")
}
// get container related container
cnr, err := g.cnrSrc.Get(addr.ContainerID())
if err != nil {
return nil, errors.Wrap(err, "could not get container")
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 3+len(g.customOpts))
traverseOpts = append(traverseOpts, g.customOpts...)
// create builder of the remote nodes from network map
builder := NewRemotePlacementBuilder(
placement.NewNetworkMapBuilder(nm),
g.localAddrSrc,
)
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
// set identifier of the processing object
placement.ForObject(addr.ObjectID()),
// set placement builder
placement.UseBuilder(builder),
)
return placement.NewTraverser(traverseOpts...)
}

View file

@ -40,3 +40,11 @@ func (s *Service) HandleServerStreamRequest(ctx context.Context, req interface{}
recv: msgRdr,
}, nil
}
func (s *Service) HandleServerStreamRequest_(respWriter util.ResponseMessageWriter) util.ResponseMessageWriter {
return func(resp util.ResponseMessage) error {
setMeta(resp, s.cfg)
return respWriter(resp)
}
}

View file

@ -0,0 +1,10 @@
package util
import (
"context"
)
// ServerStream is an interface of server-side stream v2.
type ServerStream interface {
Context() context.Context
}

View file

@ -21,6 +21,10 @@ type SignService struct {
key *ecdsa.PrivateKey
}
type ResponseMessageWriter func(ResponseMessage) error
type ServerStreamHandler_ func(interface{}, ResponseMessageWriter) (ResponseMessageWriter, error)
type ServerStreamHandler func(context.Context, interface{}) (ResponseMessageReader, error)
type ResponseMessageReader func() (ResponseMessage, error)
@ -109,6 +113,21 @@ func (s *SignService) HandleServerStreamRequest(ctx context.Context, req interfa
}, nil
}
func (s *SignService) HandleServerStreamRequest_(req interface{}, respWriter ResponseMessageWriter) (ResponseMessageWriter, error) {
// verify request signatures
if err := signature.VerifyServiceMessage(req); err != nil {
return nil, errors.Wrap(err, "could not verify request")
}
return func(resp ResponseMessage) error {
if err := signature.SignServiceMessage(s.key, resp); err != nil {
return errors.Wrap(err, "could not sign response message")
}
return respWriter(resp)
}, nil
}
func (s *SignService) HandleUnaryRequest(ctx context.Context, req interface{}, handler UnaryHandler) (ResponseMessage, error) {
// verify request signatures
if err := signature.VerifyServiceMessage(req); err != nil {