[#235] services/object: Implement new GetRange algorithm

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-12-07 20:49:47 +03:00 committed by Alex Vanin
parent 91d8e0a4de
commit 1d23483828
37 changed files with 703 additions and 1125 deletions

View file

@ -28,8 +28,6 @@ import (
headsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/head/v2"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
rangesvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/range/v2"
rangehashsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash"
rangehashsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/rangehash/v2"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
@ -52,8 +50,6 @@ type objectSvc struct {
get *getsvcV2.Service
rng *rangesvcV2.Service
rngHash *rangehashsvcV2.Service
delete *deletesvcV2.Service
@ -163,8 +159,8 @@ func (s *objectSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*obj
return s.delete.Delete(ctx, req)
}
func (s *objectSvc) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
return s.rng.GetRange(ctx, req)
func (s *objectSvc) GetRange(req *object.GetRangeRequest, stream objectService.GetObjectRangeStream) error {
return s.get.GetRange(req, stream)
}
func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
@ -354,25 +350,6 @@ func initObjectService(c *cfg) {
headsvcV2.WithInternalService(sHead),
)
sRange := rangesvc.NewService(
rangesvc.WithKeyStorage(keyStorage),
rangesvc.WithClientCache(clientCache),
rangesvc.WithLocalStorage(ls),
rangesvc.WithContainerSource(c.cfgObject.cnrStorage),
rangesvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
rangesvc.WithLocalAddressSource(c),
rangesvc.WithWorkerPool(c.cfgObject.pool.rng),
rangesvc.WithHeadService(sHead),
rangesvc.WithLogger(c.log),
rangesvc.WithClientOptions(
client.WithDialTimeout(c.viper.GetDuration(cfgObjectRangeDialTimeout)),
),
)
sRangeV2 := rangesvcV2.NewService(
rangesvcV2.WithInternalService(sRange),
)
sGet := getsvc.New(
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
@ -401,7 +378,7 @@ func initObjectService(c *cfg) {
rangehashsvc.WithNetworkMapSource(c.cfgObject.netMapStorage),
rangehashsvc.WithLocalAddressSource(c),
rangehashsvc.WithHeadService(sHead),
rangehashsvc.WithRangeService(sRange),
rangehashsvc.WithRangeService(sGet),
rangehashsvc.WithWorkerPool(c.cfgObject.pool.rngHash),
rangehashsvc.WithLogger(c.log),
rangehashsvc.WithClientOptions(
@ -451,7 +428,6 @@ func initObjectService(c *cfg) {
put: sPutV2,
search: sSearchV2,
head: sHeadV2,
rng: sRangeV2,
get: sGetV2,
rngHash: sRangeHashV2,
delete: sDeleteV2,

2
go.mod
View file

@ -17,7 +17,7 @@ require (
github.com/multiformats/go-multihash v0.0.13 // indirect
github.com/nspcc-dev/hrw v1.0.9
github.com/nspcc-dev/neo-go v0.91.1-pre.0.20201030072836-71216865717b
github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201203150742-6db6b569e098
github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201208072327-139660c6ff59
github.com/nspcc-dev/neofs-crypto v0.3.0
github.com/nspcc-dev/tzhash v1.4.0
github.com/panjf2000/ants/v2 v2.3.0

4
go.sum
View file

@ -283,8 +283,8 @@ github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1:
github.com/nspcc-dev/neo-go v0.91.0/go.mod h1:G6HdOWvzQ6tlvFdvFSN/PgCzLPN/X/X4d5hTjFRUDcc=
github.com/nspcc-dev/neo-go v0.91.1-pre.0.20201030072836-71216865717b h1:gk5bZgpWOehaDVKI5vBDkcjXTpRkKqcvIb1h/vHnBH4=
github.com/nspcc-dev/neo-go v0.91.1-pre.0.20201030072836-71216865717b/go.mod h1:9s7LNp2lMgf0caH2t0sam4+WT2SIauXozwP0AdBqnEo=
github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201203150742-6db6b569e098 h1:lq17L0qNacW4cdJ4SqcnTmei2g/wg+36nkobznI7YAo=
github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201203150742-6db6b569e098/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201208072327-139660c6ff59 h1:OfeINXdkMrlfdqsVK5ECYi7vgt9TpO/R3JMQcnQndXI=
github.com/nspcc-dev/neofs-api-go v1.20.3-0.20201208072327-139660c6ff59/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8=
github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA=
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=

View file

@ -53,7 +53,8 @@ func (r *HeadRes) Header() *object.Object {
// Returns any error encountered that
// did not allow to completely read the object header.
//
// Returns ErrNotFound if requested object is missing in local storage.
// Returns object.ErrNotFound if requested object is missing in local storage.
// Returns object.ErrAlreadyRemoved if requested object was inhumed.
func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) {
var (
head *object.Object

View file

@ -36,9 +36,9 @@ func (p *RngPrm) WithAddress(addr *objectSDK.Address) *RngPrm {
//
// Missing an option or calling with zero length is equivalent
// to getting the full payload range.
func (p *RngPrm) WithPayloadRange(off, ln uint64) *RngPrm {
func (p *RngPrm) WithPayloadRange(rng *objectSDK.Range) *RngPrm {
if p != nil {
p.off, p.ln = off, ln
p.off, p.ln = rng.GetOffset(), rng.GetLength()
}
return p
@ -111,7 +111,7 @@ func (e *StorageEngine) GetRange(prm *RngPrm) (*RngRes, error) {
func GetRange(storage *StorageEngine, addr *objectSDK.Address, rng *objectSDK.Range) ([]byte, error) {
res, err := storage.GetRange(new(RngPrm).
WithAddress(addr).
WithPayloadRange(rng.GetOffset(), rng.GetLength()),
WithPayloadRange(rng),
)
if err != nil {
return nil, err

View file

@ -0,0 +1,28 @@
package object
import (
"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
)
type getRangeStreamerV2 struct {
objectGRPC.ObjectService_GetRangeServer
}
func (s *getRangeStreamerV2) Send(resp *object.GetRangeResponse) error {
return s.ObjectService_GetRangeServer.Send(
object.GetRangeResponseToGRPCMessage(resp),
)
}
// GetRange converts gRPC GetRangeRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) GetRange(req *objectGRPC.GetRangeRequest, gStream objectGRPC.ObjectService_GetRangeServer) error {
// TODO: think about how we transport errors through gRPC
return s.srv.GetRange(
object.GetRangeRequestFromGRPCMessage(req),
&getRangeStreamerV2{
ObjectService_GetRangeServer: gStream,
},
)
}

View file

@ -99,31 +99,6 @@ func (s *Server) Search(req *objectGRPC.SearchRequest, gStream objectGRPC.Object
}
}
// GetRange converts gRPC GetRangeRequest message, opens internal Object service Search stream and overtakes its data
// to gRPC stream.
func (s *Server) GetRange(req *objectGRPC.GetRangeRequest, gStream objectGRPC.ObjectService_GetRangeServer) error {
stream, err := s.srv.GetRange(gStream.Context(), object.GetRangeRequestFromGRPCMessage(req))
if err != nil {
// TODO: think about how we transport errors through gRPC
return err
}
for {
r, err := stream.Recv()
if err != nil {
if errors.Is(errors.Cause(err), io.EOF) {
return nil
}
return err
}
if err := gStream.Send(object.GetRangeResponseToGRPCMessage(r)); err != nil {
return err
}
}
}
// GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service.
func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) {
resp, err := s.srv.GetRangeHash(ctx, object.GetRangeHashRequestFromGRPCMessage(req))

View file

@ -45,6 +45,14 @@ type (
*eACLCfg
}
rangeStreamBasicChecker struct {
objectSvc.GetObjectRangeStream
info requestInfo
*eACLCfg
}
searchStreamBasicChecker struct {
object.SearchObjectStreamer
}
@ -262,13 +270,10 @@ func (b Service) Delete(
return b.next.Delete(ctx, request)
}
func (b Service) GetRange(
ctx context.Context,
request *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
func (b Service) GetRange(request *object.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error {
cid, err := getContainerIDFromRequest(request)
if err != nil {
return nil, err
return err
}
req := metaWithToken{
@ -279,17 +284,20 @@ func (b Service) GetRange(
reqInfo, err := b.findRequestInfo(req, cid, acl.OperationRange)
if err != nil {
return nil, err
return err
}
if !basicACLCheck(reqInfo) {
return nil, basicACLErr(reqInfo)
return basicACLErr(reqInfo)
} else if !eACLCheck(request, reqInfo, b.eACLCfg) {
return nil, eACLErr(reqInfo)
return eACLErr(reqInfo)
}
stream, err := b.next.GetRange(ctx, request)
return getRangeStreamBasicChecker{stream}, err
return b.next.GetRange(request, &rangeStreamBasicChecker{
GetObjectRangeStream: stream,
info: reqInfo,
eACLCfg: b.eACLCfg,
})
}
func (b Service) GetRangeHash(
@ -374,6 +382,14 @@ func (g *getStreamBasicChecker) Send(resp *object.GetResponse) error {
return g.GetObjectStream.Send(resp)
}
func (g *rangeStreamBasicChecker) Send(resp *object.GetRangeResponse) error {
if !eACLCheck(resp, g.info, g.eACLCfg) {
return eACLErr(g.info)
}
return g.GetObjectRangeStream.Send(resp)
}
func (b Service) findRequestInfo(
req metaWithToken,
cid *container.ID,

View file

@ -119,17 +119,16 @@ func (h *headerSource) objectHeaders() ([]eacl.Header, bool) {
var hdr *objectV2.Header
switch v := resp.GetBody().GetHeaderPart().(type) {
case *objectV2.GetHeaderPartShort:
case *objectV2.ShortHeader:
hdr = new(objectV2.Header)
h := v.GetShortHeader()
hdr.SetVersion(h.GetVersion())
hdr.SetCreationEpoch(h.GetCreationEpoch())
hdr.SetOwnerID(h.GetOwnerID())
hdr.SetObjectType(h.GetObjectType())
hdr.SetPayloadLength(h.GetPayloadLength())
case *objectV2.GetHeaderPartFull:
hdr = v.GetHeaderWithSignature().GetHeader()
hdr.SetVersion(v.GetVersion())
hdr.SetCreationEpoch(v.GetCreationEpoch())
hdr.SetOwnerID(v.GetOwnerID())
hdr.SetObjectType(v.GetObjectType())
hdr.SetPayloadLength(v.GetPayloadLength())
case *objectV2.HeaderWithSignature:
hdr = v.GetHeader()
}
oV2.SetHeader(hdr)

View file

@ -24,8 +24,19 @@ func (exec *execCtx) assemble() {
prev, children := exec.initFromChild(childID)
if len(children) > 0 {
if ok := exec.writeCollectedHeader(); ok {
exec.overtakePayloadDirectly(children)
if exec.ctxRange() == nil {
if ok := exec.writeCollectedHeader(); ok {
exec.overtakePayloadDirectly(children, nil)
}
} else {
// TODO: choose one-by-one restoring algorithm according to size
// * if size > MAX => go right-to-left with HEAD and back with GET
// * else go right-to-left with GET and compose in single object before writing
if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok {
// payload of all children except the last are written, write last payload
exec.writeObjectPayload(exec.collectedObject)
}
}
} else if prev != nil {
if ok := exec.writeCollectedHeader(); ok {
@ -39,9 +50,6 @@ func (exec *execCtx) assemble() {
}
}
} else {
exec.status = statusUndefined
exec.err = object.ErrNotFound
exec.log.Debug("could not init parent from child")
}
}
@ -51,8 +59,9 @@ func (exec *execCtx) initFromChild(id *objectSDK.ID) (prev *objectSDK.ID, childr
log.Debug("starting assembling from child")
child, ok := exec.getChild(id)
child, ok := exec.getChild(id, nil)
if !ok {
return
}
@ -75,14 +84,48 @@ func (exec *execCtx) initFromChild(id *objectSDK.ID) (prev *objectSDK.ID, childr
}
exec.collectedObject = par
object.NewRawFromObject(exec.collectedObject).SetPayload(child.Payload())
var payload []byte
if rng := exec.ctxRange(); rng != nil {
seekLen := rng.GetLength()
seekOff := rng.GetOffset()
parSize := par.PayloadSize()
if seekOff+seekLen > parSize {
exec.status = statusOutOfRange
exec.err = object.ErrRangeOutOfBounds
return
}
childSize := child.PayloadSize()
if to := seekOff + seekLen; childSize > 0 && to > parSize-childSize {
pref := to + childSize - parSize
payload = child.Payload()[:pref]
rng.SetLength(rng.GetLength() - pref)
}
exec.curOff = parSize - childSize
} else {
payload = child.Payload()
}
object.NewRawFromObject(exec.collectedObject).SetPayload(payload)
return child.PreviousID(), child.Children()
}
func (exec *execCtx) overtakePayloadDirectly(children []*objectSDK.ID) {
func (exec *execCtx) overtakePayloadDirectly(children []*objectSDK.ID, rngs []*objectSDK.Range) {
withRng := len(rngs) > 0
for i := range children {
child, ok := exec.getChild(children[i])
var r *objectSDK.Range
if withRng {
r = rngs[i]
}
child, ok := exec.getChild(children[i], r)
if !ok {
return
}
@ -97,26 +140,23 @@ func (exec *execCtx) overtakePayloadDirectly(children []*objectSDK.ID) {
}
func (exec *execCtx) overtakePayloadInReverse(prev *objectSDK.ID) bool {
chain := make([]*objectSDK.ID, 0)
// fill the chain end-to-start
for prev != nil {
head, ok := exec.headChild(prev)
if !ok {
return false
}
chain = append(chain, head.ID())
prev = head.PreviousID()
chain, rngs := exec.buildChainInReverse(prev)
if len(chain) == 0 {
return false
}
reverseRngs := len(rngs) > 0
// reverse chain
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
chain[left], chain[right] = chain[right], chain[left]
if reverseRngs {
rngs[left], rngs[right] = rngs[right], rngs[left]
}
}
exec.overtakePayloadDirectly(chain)
exec.overtakePayloadDirectly(chain, rngs)
exec.status = statusOK
exec.err = nil
@ -124,6 +164,59 @@ func (exec *execCtx) overtakePayloadInReverse(prev *objectSDK.ID) bool {
return true
}
func (exec *execCtx) buildChainInReverse(prev *objectSDK.ID) ([]*objectSDK.ID, []*objectSDK.Range) {
var (
chain = make([]*objectSDK.ID, 0)
rngs = make([]*objectSDK.Range, 0)
seekRng = exec.ctxRange()
from = seekRng.GetOffset()
to = from + seekRng.GetLength()
)
// fill the chain end-to-start
for prev != nil {
if exec.curOff < from {
break
}
head, ok := exec.headChild(prev)
if !ok {
return nil, nil
}
if seekRng != nil {
sz := head.PayloadSize()
exec.curOff -= sz
if exec.curOff < from+to {
off := uint64(0)
if from > exec.curOff {
off = from - exec.curOff
sz -= from - exec.curOff
}
if to < exec.curOff+off+sz {
sz = to - off - exec.curOff
}
r := objectSDK.NewRange()
r.SetOffset(off)
r.SetLength(sz)
rngs = append(rngs, r)
chain = append(chain, head.ID())
}
} else {
chain = append(chain, head.ID())
}
prev = head.PreviousID()
}
return chain, rngs
}
func equalAddresses(a, b *objectSDK.Address) bool {
return a.ContainerID().Equal(b.ContainerID()) &&
a.ObjectID().Equal(b.ObjectID())

View file

@ -24,7 +24,7 @@ type execCtx struct {
ctx context.Context
prm Prm
prm RangePrm
statusError
@ -33,6 +33,8 @@ type execCtx struct {
log *logger.Logger
collectedObject *object.Object
curOff uint64
}
const (
@ -40,11 +42,17 @@ const (
statusOK
statusINHUMED
statusVIRTUAL
statusOutOfRange
)
func (exec *execCtx) setLogger(l *logger.Logger) {
req := "GET"
if exec.ctxRange() != nil {
req = "GET_RANGE"
}
exec.log = l.With(
zap.String("request", "GET"),
zap.String("request", req),
zap.Stringer("address", exec.address()),
zap.Bool("raw", exec.isRaw()),
zap.Bool("local", exec.isLocal()),
@ -62,11 +70,11 @@ func (exec execCtx) isLocal() bool {
}
func (exec execCtx) isRaw() bool {
return exec.prm.raw
return exec.prm.RawFlag()
}
func (exec execCtx) address() *objectSDK.Address {
return exec.prm.addr
return exec.prm.Address()
}
func (exec execCtx) key() *ecdsa.PrivateKey {
@ -79,8 +87,8 @@ func (exec execCtx) callOptions() []client.CallOption {
func (exec execCtx) remotePrm() *client.GetObjectParams {
return new(client.GetObjectParams).
WithAddress(exec.prm.addr).
WithRawFlag(exec.prm.raw)
WithAddress(exec.prm.Address()).
WithRawFlag(exec.prm.RawFlag())
}
func (exec *execCtx) canAssemble() bool {
@ -95,6 +103,10 @@ func (exec *execCtx) containerID() *container.ID {
return exec.address().ContainerID()
}
func (exec *execCtx) ctxRange() *objectSDK.Range {
return exec.prm.rng
}
func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr)
@ -113,18 +125,19 @@ func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Trav
}
}
func (exec *execCtx) getChild(id *objectSDK.ID) (*object.Object, bool) {
func (exec *execCtx) getChild(id *objectSDK.ID, rng *objectSDK.Range) (*object.Object, bool) {
w := newSimpleObjectWriter()
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.SetObjectWriter(w)
p.objWriter = w
p.SetRange(rng)
addr := objectSDK.NewAddress()
addr.SetContainerID(exec.address().ContainerID())
addr.SetObjectID(id)
p.SetAddress(addr)
p.WithAddress(addr)
exec.statusError = exec.svc.get(exec.context(), p)
@ -138,9 +151,11 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.SetAddress(childAddr)
p.WithAddress(childAddr)
header, err := exec.svc.headSvc.head(exec.context(), p)
header, err := exec.svc.headSvc.head(exec.context(), Prm{
commonPrm: p.commonPrm,
})
switch {
default:
@ -204,6 +219,10 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
}
func (exec *execCtx) writeCollectedHeader() bool {
if exec.ctxRange() != nil {
return true
}
err := exec.prm.objWriter.WriteHeader(
object.NewRawFromObject(exec.collectedObject).CutPayload().Object(),
)

View file

@ -9,10 +9,17 @@ import (
// Get serves a request to get an object by address, and returns Streamer instance.
func (s *Service) Get(ctx context.Context, prm Prm) error {
return s.get(ctx, RangePrm{
commonPrm: prm.commonPrm,
}).err
}
// GetRange serves a request to get an object by address, and returns Streamer instance.
func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
return s.get(ctx, prm).err
}
func (s *Service) get(ctx context.Context, prm Prm) statusError {
func (s *Service) get(ctx context.Context, prm RangePrm) statusError {
exec := &execCtx{
svc: s,
ctx: ctx,
@ -46,6 +53,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
case statusVIRTUAL:
exec.log.Debug("requested object is virtual")
exec.assemble()
case statusOutOfRange:
exec.log.Debug("requested range is out of object bounds")
default:
exec.log.Debug("operation finished with error",
zap.String("error", exec.err.Error()),

View file

@ -93,17 +93,21 @@ func newTestClient() *testClient {
}
}
func (c *testClient) GetObject(_ context.Context, p Prm) (*objectSDK.Object, error) {
v, ok := c.results[p.addr.String()]
func (c *testClient) GetObject(_ context.Context, p RangePrm) (*objectSDK.Object, error) {
v, ok := c.results[p.Address().String()]
if !ok {
return nil, object.ErrNotFound
}
return v.obj.Object().SDK(), v.err
if v.err != nil {
return nil, v.err
}
return cutToRange(v.obj.Object(), p.rng).SDK(), nil
}
func (c *testClient) head(_ context.Context, p Prm) (*object.Object, error) {
v, ok := c.results[p.addr.String()]
v, ok := c.results[p.Address().String()]
if !ok {
return nil, object.ErrNotFound
}
@ -122,11 +126,11 @@ func (c *testClient) addResult(addr *objectSDK.Address, obj *object.RawObject, e
}{obj: obj, err: err}
}
func (s *testStorage) Get(addr *objectSDK.Address) (*object.Object, error) {
func (s *testStorage) Get(p RangePrm) (*object.Object, error) {
var (
ok bool
obj *object.Object
sAddr = addr.String()
sAddr = p.Address().String()
)
if _, ok = s.inhumed[sAddr]; ok {
@ -138,12 +142,30 @@ func (s *testStorage) Get(addr *objectSDK.Address) (*object.Object, error) {
}
if obj, ok = s.phy[sAddr]; ok {
return obj, nil
return cutToRange(obj, p.rng), nil
}
return nil, object.ErrNotFound
}
func cutToRange(o *object.Object, rng *objectSDK.Range) *object.Object {
obj := object.NewRawFromObject(o)
if rng == nil {
return obj.Object()
}
from := rng.GetOffset()
to := from + rng.GetLength()
payload := obj.Payload()
obj = obj.CutPayload()
obj.SetPayload(payload[from:to])
return obj.Object()
}
func (s *testStorage) addPhy(addr *objectSDK.Address, obj *object.RawObject) {
s.phy[addr.String()] = obj.Object()
}
@ -204,11 +226,27 @@ func TestGetLocalOnly(t *testing.T) {
}
newPrm := func(raw bool, w ObjectWriter) Prm {
return Prm{
objWriter: w,
raw: raw,
common: new(util.CommonPrm).WithLocalOnly(true),
}
p := Prm{}
p.SetObjectWriter(w)
p.WithRawFlag(raw)
p.common = new(util.CommonPrm).WithLocalOnly(true)
return p
}
newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm {
p := RangePrm{}
p.SetChunkWriter(w)
p.WithRawFlag(raw)
p.common = new(util.CommonPrm).WithLocalOnly(true)
r := objectSDK.NewRange()
r.SetOffset(off)
r.SetLength(ln)
p.SetRange(r)
return p
}
t.Run("OK", func(t *testing.T) {
@ -218,12 +256,16 @@ func TestGetLocalOnly(t *testing.T) {
w := newSimpleObjectWriter()
p := newPrm(false, w)
payloadSz := uint64(10)
payload := make([]byte, payloadSz)
rand.Read(payload)
addr := generateAddress()
obj := generateObject(addr, nil, nil)
obj := generateObject(addr, nil, payload)
storage.addPhy(addr, obj)
p.addr = addr
p.WithAddress(addr)
storage.addPhy(addr, obj)
@ -232,6 +274,15 @@ func TestGetLocalOnly(t *testing.T) {
require.NoError(t, err)
require.Equal(t, obj.Object(), w.object())
w = newSimpleObjectWriter()
rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.object().Payload())
})
t.Run("INHUMED", func(t *testing.T) {
@ -244,11 +295,17 @@ func TestGetLocalOnly(t *testing.T) {
storage.inhume(addr)
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
rngPrm := newRngPrm(false, nil, 0, 0)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
})
t.Run("404", func(t *testing.T) {
@ -259,11 +316,18 @@ func TestGetLocalOnly(t *testing.T) {
addr := generateAddress()
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
rngPrm := newRngPrm(false, nil, 0, 0)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("VIRTUAL", func(t *testing.T) {
@ -279,7 +343,7 @@ func TestGetLocalOnly(t *testing.T) {
splitInfo.SetLink(generateID())
splitInfo.SetLastPart(generateID())
p.addr = addr
p.WithAddress(addr)
storage.addVirtual(addr, splitInfo)
@ -290,6 +354,13 @@ func TestGetLocalOnly(t *testing.T) {
require.True(t, errors.As(err, &errSplit))
require.Equal(t, splitInfo, errSplit.SplitInfo())
rngPrm := newRngPrm(true, nil, 0, 0)
rngPrm.WithAddress(addr)
err = svc.Get(ctx, p)
require.True(t, errors.As(err, &errSplit))
})
}
@ -347,6 +418,7 @@ func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK
o := generateObject(addr, prevID, []byte{byte(i)})
o.SetPayload(payloadPart)
o.SetPayloadSize(uint64(len(payloadPart)))
o.SetID(curID)
payload = append(payload, payloadPart...)
@ -381,11 +453,27 @@ func TestGetRemoteSmall(t *testing.T) {
}
newPrm := func(raw bool, w ObjectWriter) Prm {
return Prm{
objWriter: w,
raw: raw,
common: new(util.CommonPrm).WithLocalOnly(false),
}
p := Prm{}
p.SetObjectWriter(w)
p.WithRawFlag(raw)
p.common = new(util.CommonPrm).WithLocalOnly(false)
return p
}
newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm {
p := RangePrm{}
p.SetChunkWriter(w)
p.WithRawFlag(raw)
p.common = new(util.CommonPrm).WithLocalOnly(false)
r := objectSDK.NewRange()
r.SetOffset(off)
r.SetLength(ln)
p.SetRange(r)
return p
}
t.Run("OK", func(t *testing.T) {
@ -400,7 +488,11 @@ func TestGetRemoteSmall(t *testing.T) {
},
}
obj := generateObject(addr, nil, nil)
payloadSz := uint64(10)
payload := make([]byte, payloadSz)
rand.Read(payload)
obj := generateObject(addr, nil, payload)
c1 := newTestClient()
c1.addResult(addr, obj, nil)
@ -418,7 +510,7 @@ func TestGetRemoteSmall(t *testing.T) {
w := newSimpleObjectWriter()
p := newPrm(false, w)
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.NoError(t, err)
@ -429,6 +521,14 @@ func TestGetRemoteSmall(t *testing.T) {
err = svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj.Object(), w.object())
w = newSimpleObjectWriter()
rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.object().Payload())
})
t.Run("INHUMED", func(t *testing.T) {
@ -457,10 +557,16 @@ func TestGetRemoteSmall(t *testing.T) {
})
p := newPrm(false, nil)
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
rngPrm := newRngPrm(false, nil, 0, 0)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
})
t.Run("404", func(t *testing.T) {
@ -489,10 +595,16 @@ func TestGetRemoteSmall(t *testing.T) {
})
p := newPrm(false, nil)
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
rngPrm := newRngPrm(false, nil, 0, 0)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("VIRTUAL", func(t *testing.T) {
@ -534,10 +646,16 @@ func TestGetRemoteSmall(t *testing.T) {
})
p := newPrm(false, nil)
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
rngPrm := newRngPrm(false, nil, 0, 0)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("get chain element failure", func(t *testing.T) {
@ -546,6 +664,7 @@ func TestGetRemoteSmall(t *testing.T) {
addr.SetObjectID(generateID())
srcObj := generateObject(addr, nil, nil)
srcObj.SetPayloadSize(10)
ns, as := testNodeMatrix(t, []int{2})
@ -580,7 +699,7 @@ func TestGetRemoteSmall(t *testing.T) {
c2.addResult(addr, nil, objectSDK.NewSplitInfoError(splitInfo))
c2.addResult(linkAddr, linkingObj, nil)
c2.addResult(child1Addr, children[0], nil)
c2.addResult(child2Addr, nil, errors.New("any error"))
c2.addResult(child2Addr, nil, object.ErrNotFound)
builder := &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
@ -599,10 +718,18 @@ func TestGetRemoteSmall(t *testing.T) {
})
p := newPrm(false, newSimpleObjectWriter())
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
svc.headSvc = c2
rngPrm := newRngPrm(false, newSimpleObjectWriter(), 0, 1)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("OK", func(t *testing.T) {
@ -619,6 +746,7 @@ func TestGetRemoteSmall(t *testing.T) {
children, childIDs, payload := generateChain(2, cid)
srcObj.SetPayload(payload)
srcObj.SetPayloadSize(uint64(len(payload)))
linkAddr := objectSDK.NewAddress()
linkAddr.SetContainerID(cid)
@ -667,11 +795,26 @@ func TestGetRemoteSmall(t *testing.T) {
w := newSimpleObjectWriter()
p := newPrm(false, w)
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, srcObj.Object(), w.object())
svc.headSvc = c2
w = newSimpleObjectWriter()
payloadSz := srcObj.PayloadSize()
off := payloadSz / 3
ln := payloadSz / 3
rngPrm := newRngPrm(false, w, off, ln)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[off:off+ln], w.object().Payload())
})
})
@ -713,10 +856,16 @@ func TestGetRemoteSmall(t *testing.T) {
})
p := newPrm(false, nil)
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
rngPrm := newRngPrm(false, nil, 0, 0)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("get chain element failure", func(t *testing.T) {
@ -725,6 +874,7 @@ func TestGetRemoteSmall(t *testing.T) {
addr.SetObjectID(generateID())
srcObj := generateObject(addr, nil, nil)
srcObj.SetPayloadSize(10)
ns, as := testNodeMatrix(t, []int{2})
@ -757,7 +907,6 @@ func TestGetRemoteSmall(t *testing.T) {
addr.String(): ns,
rightAddr.String(): ns,
preRightAddr.String(): ns,
preRightAddr.String(): ns,
},
}
@ -773,10 +922,16 @@ func TestGetRemoteSmall(t *testing.T) {
svc.headSvc = headSvc
p := newPrm(false, newSimpleObjectWriter())
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
rngPrm := newRngPrm(false, nil, 0, 1)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("OK", func(t *testing.T) {
@ -792,6 +947,7 @@ func TestGetRemoteSmall(t *testing.T) {
splitInfo.SetLastPart(generateID())
children, _, payload := generateChain(2, cid)
srcObj.SetPayloadSize(uint64(len(payload)))
srcObj.SetPayload(payload)
rightObj := children[len(children)-1]
@ -836,11 +992,24 @@ func TestGetRemoteSmall(t *testing.T) {
w := newSimpleObjectWriter()
p := newPrm(false, w)
p.addr = addr
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, srcObj.Object(), w.object())
w = newSimpleObjectWriter()
payloadSz := srcObj.PayloadSize()
off := payloadSz / 3
ln := payloadSz / 3
rngPrm := newRngPrm(false, w, off, ln)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[off:off+ln], w.object().Payload())
})
})
})

View file

@ -10,7 +10,7 @@ import (
func (exec *execCtx) executeLocal() {
var err error
exec.collectedObject, err = exec.svc.localStorage.Get(exec.address())
exec.collectedObject, err = exec.svc.localStorage.Get(exec.prm)
var errSplitInfo *objectSDK.SplitInfoError
@ -33,5 +33,8 @@ func (exec *execCtx) executeLocal() {
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
case errors.Is(err, object.ErrRangeOutOfBounds):
exec.status = statusOutOfRange
exec.err = object.ErrRangeOutOfBounds
}
}

View file

@ -11,6 +11,17 @@ import (
// Prm groups parameters of Get service call.
type Prm struct {
commonPrm
}
// RangePrm groups parameters of GetRange service call.
type RangePrm struct {
commonPrm
rng *objectSDK.Range
}
type commonPrm struct {
objWriter ObjectWriter
// TODO: replace key and callOpts to CommonPrm
@ -20,16 +31,19 @@ type Prm struct {
common *util.CommonPrm
// TODO: use parameters from NeoFS SDK
addr *objectSDK.Address
client.GetObjectParams
}
raw bool
// ChunkWriter is an interface of target component
// to write payload chunk.
type ChunkWriter interface {
WriteChunk([]byte) error
}
// ObjectWriter is an interface of target component to write object.
type ObjectWriter interface {
WriteHeader(*object.Object) error
WriteChunk([]byte) error
ChunkWriter
}
// SetObjectWriter sets target component to write the object.
@ -38,22 +52,23 @@ func (p *Prm) SetObjectWriter(w ObjectWriter) {
}
// SetPrivateKey sets private key to use during execution.
func (p *Prm) SetPrivateKey(key *ecdsa.PrivateKey) {
func (p *commonPrm) SetPrivateKey(key *ecdsa.PrivateKey) {
p.key = key
}
// SetRemoteCallOptions sets call options remote remote client calls.
func (p *Prm) SetRemoteCallOptions(opts ...client.CallOption) {
func (p *commonPrm) SetRemoteCallOptions(opts ...client.CallOption) {
p.callOpts = opts
}
// SetAddress sets address of the requested object.
func (p *Prm) SetAddress(addr *objectSDK.Address) {
p.addr = addr
// SetObjectWriter sets target component to write the object payload range.
func (p *RangePrm) SetChunkWriter(w ChunkWriter) {
p.objWriter = &rangeWriter{
chunkWriter: w,
}
}
// SetRaw sets raw flag. If flag is set,
// object assembling will not be undertaken.
func (p *Prm) SetRaw(raw bool) {
p.raw = raw
// SetRange sets range of the requested payload data.
func (p *RangePrm) SetRange(rng *objectSDK.Range) {
p.rng = rng
}

View file

@ -25,7 +25,7 @@ type Service struct {
type Option func(*cfg)
type getClient interface {
GetObject(context.Context, Prm) (*objectSDK.Object, error)
GetObject(context.Context, RangePrm) (*objectSDK.Object, error)
}
type cfg struct {
@ -38,7 +38,7 @@ type cfg struct {
}
localStorage interface {
Get(*objectSDK.Address) (*object.Object, error)
Get(RangePrm) (*object.Object, error)
}
clientCache interface {

View file

@ -15,7 +15,7 @@ import (
type simpleObjectWriter struct {
obj *object.RawObject
payload []byte
pld []byte
}
type clientCacheWrapper struct {
@ -36,20 +36,28 @@ type headSvcWrapper struct {
svc *headsvc.Service
}
type rangeWriter struct {
ObjectWriter
chunkWriter ChunkWriter
}
func newSimpleObjectWriter() *simpleObjectWriter {
return new(simpleObjectWriter)
return &simpleObjectWriter{
obj: object.NewRaw(),
}
}
func (s *simpleObjectWriter) WriteHeader(obj *object.Object) error {
s.obj = object.NewRawFromObject(obj)
s.payload = make([]byte, 0, obj.PayloadSize())
s.pld = make([]byte, 0, obj.PayloadSize())
return nil
}
func (s *simpleObjectWriter) WriteChunk(p []byte) error {
s.payload = append(s.payload, p...)
s.pld = append(s.pld, p...)
return nil
}
@ -58,8 +66,8 @@ func (s *simpleObjectWriter) Close() error {
}
func (s *simpleObjectWriter) object() *object.Object {
if len(s.payload) > 0 {
s.obj.SetPayload(s.payload)
if len(s.pld) > 0 {
s.obj.SetPayload(s.pld)
}
return s.obj.Object()
@ -73,31 +81,60 @@ func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (getClient,
}, err
}
func (c *clientWrapper) GetObject(ctx context.Context, p Prm) (*objectSDK.Object, error) {
func (c *clientWrapper) GetObject(ctx context.Context, p RangePrm) (*objectSDK.Object, error) {
// we don't specify payload writer because we accumulate
// the object locally (even huge).
return c.client.GetObject(ctx,
new(client.GetObjectParams).
WithAddress(p.addr).
WithRawFlag(true),
p.callOpts...,
)
if p.rng != nil {
data, err := c.client.ObjectPayloadRangeData(ctx,
new(client.RangeDataParams).
WithAddress(p.Address()).
WithRange(p.rng).
WithRaw(p.RawFlag()),
p.callOpts...,
)
if err != nil {
return nil, err
}
return payloadOnlyObject(data), nil
} else {
// we don't specify payload writer because we accumulate
// the object locally (even huge).
return c.client.GetObject(ctx,
new(client.GetObjectParams).
WithAddress(p.Address()).
WithRawFlag(p.RawFlag()),
p.callOpts...,
)
}
}
func (e *storageEngineWrapper) Get(addr *objectSDK.Address) (*object.Object, error) {
r, err := e.engine.Get(new(engine.GetPrm).
WithAddress(addr),
)
if err != nil {
return nil, err
}
func (e *storageEngineWrapper) Get(p RangePrm) (*object.Object, error) {
if p.rng != nil {
r, err := e.engine.GetRange(new(engine.RngPrm).
WithAddress(p.Address()).
WithPayloadRange(p.rng),
)
if err != nil {
return nil, err
}
return r.Object(), nil
return r.Object(), nil
} else {
r, err := e.engine.Get(new(engine.GetPrm).
WithAddress(p.Address()),
)
if err != nil {
return nil, err
}
return r.Object(), nil
}
}
func (s *headSvcWrapper) head(ctx context.Context, p Prm) (*object.Object, error) {
r, err := s.svc.Head(ctx, new(headsvc.Prm).
WithAddress(p.addr).
WithAddress(p.Address()).
WithCommonPrm(p.common).
Short(false),
)
@ -108,3 +145,14 @@ func (s *headSvcWrapper) head(ctx context.Context, p Prm) (*object.Object, error
return r.Header(), nil
}
func (w *rangeWriter) WriteChunk(p []byte) error {
return w.chunkWriter.WriteChunk(p)
}
func payloadOnlyObject(payload []byte) *objectSDK.Object {
rawObj := object.NewRaw()
rawObj.SetPayload(payload)
return rawObj.Object().SDK()
}

View file

@ -55,6 +55,25 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream
}
}
// GetRange calls internal service and returns v2 payload range stream.
func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error {
p, err := s.toRangePrm(req, stream)
if err != nil {
return err
}
err = s.svc.GetRange(stream.Context(), *p)
var splitErr *object.SplitInfoError
switch {
case errors.As(err, &splitErr):
return stream.Send(splitInfoRangeResponse(splitErr.SplitInfo()))
default:
return err
}
}
func WithInternalService(v *getsvc.Service) Option {
return func(c *cfg) {
c.svc = v

View file

@ -10,6 +10,10 @@ type streamObjectWriter struct {
objectSvc.GetObjectStream
}
type streamObjectRangeWriter struct {
objectSvc.GetObjectRangeStream
}
func (s *streamObjectWriter) WriteHeader(obj *object.Object) error {
p := new(objectV2.GetObjectPartInit)
@ -38,3 +42,21 @@ func newResponse(p objectV2.GetObjectPart) *objectV2.GetResponse {
return r
}
func (s *streamObjectRangeWriter) WriteChunk(chunk []byte) error {
return s.GetObjectRangeStream.Send(newRangeResponse(chunk))
}
func newRangeResponse(p []byte) *objectV2.GetRangeResponse {
r := new(objectV2.GetRangeResponse)
body := new(objectV2.GetRangeResponseBody)
r.SetBody(body)
part := new(objectV2.GetRangePartChunk)
part.SetChunk(p)
body.SetRangePart(part)
return r
}

View file

@ -23,14 +23,35 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
p.SetPrivateKey(key)
body := req.GetBody()
p.SetAddress(object.NewAddressFromV2(body.GetAddress()))
p.SetRaw(body.GetRaw())
p.WithAddress(object.NewAddressFromV2(body.GetAddress()))
p.WithRawFlag(body.GetRaw())
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
p.SetObjectWriter(&streamObjectWriter{stream})
return p, nil
}
func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) (*getsvc.RangePrm, error) {
meta := req.GetMetaHeader()
key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken()))
if err != nil {
return nil, err
}
p := new(getsvc.RangePrm)
p.SetPrivateKey(key)
body := req.GetBody()
p.WithAddress(object.NewAddressFromV2(body.GetAddress()))
p.WithRawFlag(body.GetRaw())
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
p.SetChunkWriter(&streamObjectRangeWriter{stream})
p.SetRange(object.NewRangeFromV2(body.GetRange()))
return p, nil
}
// can be shared accross all services
func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption {
xHdrs := meta.GetXHeaders()
@ -60,3 +81,14 @@ func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse {
return resp
}
func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse {
resp := new(objectV2.GetRangeResponse)
body := new(objectV2.GetRangeResponseBody)
resp.SetBody(body)
body.SetRangePart(info.ToV2())
return resp
}

View file

@ -40,10 +40,7 @@ func fullPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart {
hs.SetHeader(obj.GetHeader())
hs.SetSignature(obj.GetSignature())
p := new(objectV2.GetHeaderPartFull)
p.SetHeaderWithSignature(hs)
return p
return hs
}
func shortPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart {
@ -56,8 +53,5 @@ func shortPartFromResponse(r *headsvc.Response) objectV2.GetHeaderPart {
sh.SetVersion(hdr.GetVersion())
sh.SetObjectType(hdr.GetObjectType())
p := new(objectV2.GetHeaderPartShort)
p.SetShortHeader(sh)
return p
return sh
}

View file

@ -1,114 +0,0 @@
package rangesvc
import (
"fmt"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
)
type rangeTraverser struct {
chain *rangeChain
seekBounds *rangeBounds
}
type rangeBounds struct {
left, right uint64
}
type objectRange struct {
rng *objectSDK.Range
id *objectSDK.ID
}
type rangeChain struct {
next, prev *rangeChain
bounds *rangeBounds
id *objectSDK.ID
}
func newRangeTraverser(originSize uint64, rightElement *object.Object, rngSeek *objectSDK.Range) *rangeTraverser {
right := &rangeChain{
bounds: &rangeBounds{
left: originSize - rightElement.PayloadSize(),
right: originSize,
},
id: rightElement.ID(),
}
left := &rangeChain{
id: rightElement.PreviousID(),
}
left.next, right.prev = right, left
return &rangeTraverser{
chain: right,
seekBounds: &rangeBounds{
left: rngSeek.GetOffset(),
right: rngSeek.GetOffset() + rngSeek.GetLength(),
},
}
}
func (c *rangeTraverser) next() *objectRange {
left := c.chain.bounds.left
seekLeft := c.seekBounds.left
res := new(objectRange)
if left > seekLeft {
res.id = c.chain.prev.id
} else {
res.id = c.chain.id
res.rng = objectSDK.NewRange()
res.rng.SetOffset(seekLeft - left)
res.rng.SetLength(min(c.chain.bounds.right, c.seekBounds.right) - seekLeft)
}
return res
}
func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}
func (c *rangeTraverser) pushHeader(obj *object.Object) {
id := obj.ID()
if !id.Equal(c.chain.prev.id) {
panic(fmt.Sprintf("(%T) unexpected identifier in header", c))
}
sz := obj.PayloadSize()
c.chain.prev.bounds = &rangeBounds{
left: c.chain.bounds.left - sz,
right: c.chain.bounds.left,
}
c.chain = c.chain.prev
if prev := obj.PreviousID(); prev != nil {
c.chain.prev = &rangeChain{
next: c.chain,
id: prev,
}
}
}
func (c *rangeTraverser) pushSuccessSize(sz uint64) {
c.seekBounds.left += sz
if c.seekBounds.left >= c.chain.bounds.right && c.chain.next != nil {
c.chain = c.chain.next
}
}

View file

@ -1,28 +0,0 @@
package rangesvc
import (
"io"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/pkg/errors"
)
type localRangeWriter struct {
addr *object.Address
rng *object.Range
storage *engine.StorageEngine
}
func (l *localRangeWriter) WriteTo(w io.Writer) (int64, error) {
rngData, err := engine.GetRange(l.storage, l.addr, l.rng)
if err != nil {
return 0, errors.Wrapf(err, "(%T) could not get object from local storage", l)
}
n, err := w.Write(rngData)
return int64(n), err
}

View file

@ -1,48 +0,0 @@
package rangesvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
)
type Prm struct {
common *util.CommonPrm
full bool
addr *object.Address
rng *object.Range
}
func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm {
if p != nil {
p.common = v
}
return p
}
func (p *Prm) WithAddress(v *object.Address) *Prm {
if p != nil {
p.addr = v
}
return p
}
func (p *Prm) WithRange(v *object.Range) *Prm {
if p != nil {
p.rng = v
}
return p
}
func (p *Prm) FullRange() *Prm {
if p != nil {
p.full = true
}
return p
}

View file

@ -1,67 +0,0 @@
package rangesvc
import (
"context"
"io"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/pkg/errors"
)
type remoteRangeWriter struct {
ctx context.Context
keyStorage *util.KeyStorage
node *network.Address
token *token.SessionToken
bearer *token.BearerToken
addr *object.Address
rng *object.Range
clientCache *cache.ClientCache
clientOpts []client.Option
}
func (r *remoteRangeWriter) WriteTo(w io.Writer) (int64, error) {
key, err := r.keyStorage.GetKey(r.token)
if err != nil {
return 0, errors.Wrapf(err, "(%T) could not receive private key", r)
}
addr, err := r.node.IPAddrString()
if err != nil {
return 0, err
}
c, err := r.clientCache.Get(key, addr, r.clientOpts...)
if err != nil {
return 0, errors.Wrapf(err, "(%T) could not create SDK client %s", r, addr)
}
// TODO: change ObjectPayloadRangeData to implement WriterTo
chunk, err := c.ObjectPayloadRangeData(r.ctx, new(client.RangeDataParams).
WithRange(r.rng).
WithAddress(r.addr),
client.WithTTL(1), // FIXME: use constant
client.WithSession(r.token),
client.WithBearer(r.bearer),
)
if err != nil {
return 0, errors.Wrapf(err, "(%T) could not read object payload range from %s", r, addr)
}
n, err := w.Write(chunk)
return int64(n), err
}

View file

@ -1,27 +0,0 @@
package rangesvc
import (
"github.com/nspcc-dev/neofs-node/pkg/core/object"
)
type Result struct {
head *object.Object
stream Streamer
}
type Response struct {
chunk []byte
}
func (r *Response) PayloadChunk() []byte {
return r.chunk
}
func (r *Result) Head() *object.Object {
return r.head
}
func (r *Result) Stream() Streamer {
return r.stream
}

View file

@ -1,190 +0,0 @@
package rangesvc
import (
"context"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type Service struct {
*cfg
}
type Option func(*cfg)
type cfg struct {
keyStorage *objutil.KeyStorage
localStore *engine.StorageEngine
cnrSrc container.Source
netMapSrc netmap.Source
workerPool util.WorkerPool
localAddrSrc network.LocalAddressSource
headSvc *headsvc.Service
clientCache *cache.ClientCache
log *logger.Logger
clientOpts []client.Option
}
func defaultCfg() *cfg {
return &cfg{
workerPool: new(util.SyncWorkerPool),
log: zap.L(),
}
}
func NewService(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
func (s *Service) GetRange(ctx context.Context, prm *Prm) (*Result, error) {
headResult, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(prm.addr).
WithCommonPrm(prm.common),
)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive Head result", s)
}
origin := headResult.Header()
originSize := origin.PayloadSize()
if prm.full {
prm.rng = new(object.Range)
prm.rng.SetLength(originSize)
}
if originSize < prm.rng.GetOffset()+prm.rng.GetLength() {
return nil, errors.Errorf("(%T) requested payload range is out-of-bounds", s)
}
rngTraverser := objutil.NewRangeTraverser(originSize, origin, prm.rng)
if err := s.fillTraverser(ctx, prm, rngTraverser); err != nil {
return nil, errors.Wrapf(err, "(%T) could not fill range traverser", s)
}
return &Result{
head: origin,
stream: &streamer{
cfg: s.cfg,
once: new(sync.Once),
ctx: ctx,
prm: prm,
rangeTraverser: rngTraverser,
},
}, nil
}
func (s *Service) fillTraverser(ctx context.Context, prm *Prm, traverser *objutil.RangeTraverser) error {
addr := object.NewAddress()
addr.SetContainerID(prm.addr.ContainerID())
for {
nextID, nextRng := traverser.Next()
if nextRng != nil {
return nil
}
addr.SetObjectID(nextID)
head, err := s.headSvc.Head(ctx, new(headsvc.Prm).
WithAddress(addr).
WithCommonPrm(prm.common),
)
if err != nil {
return errors.Wrapf(err, "(%T) could not receive object header", s)
}
traverser.PushHeader(head.Header())
}
}
func WithKeyStorage(v *objutil.KeyStorage) Option {
return func(c *cfg) {
c.keyStorage = v
}
}
func WithLocalStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {
c.localStore = v
}
}
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
func WithNetworkMapSource(v netmap.Source) Option {
return func(c *cfg) {
c.netMapSrc = v
}
}
func WithWorkerPool(v util.WorkerPool) Option {
return func(c *cfg) {
c.workerPool = v
}
}
func WithLocalAddressSource(v network.LocalAddressSource) Option {
return func(c *cfg) {
c.localAddrSrc = v
}
}
func WithHeadService(v *headsvc.Service) Option {
return func(c *cfg) {
c.headSvc = v
}
}
func WithClientCache(v *cache.ClientCache) Option {
return func(c *cfg) {
c.clientCache = v
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithClientOptions(opts ...client.Option) Option {
return func(c *cfg) {
c.clientOpts = opts
}
}

View file

@ -1,241 +0,0 @@
package rangesvc
import (
"context"
"io"
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/pkg/errors"
)
type Streamer interface {
Recv() (*Response, error)
}
type streamer struct {
*cfg
once *sync.Once
ctx context.Context
prm *Prm
traverser *placement.Traverser
rangeTraverser *svcutil.RangeTraverser
ch chan []byte
}
type chunkWriter struct {
ctx context.Context
ch chan<- []byte
written uint64
}
func (p *streamer) Recv() (*Response, error) {
var err error
p.once.Do(func() {
p.ch = make(chan []byte)
err = p.workerPool.Submit(p.start)
})
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not start streaming", p)
}
select {
case <-p.ctx.Done():
return nil, errors.Wrapf(p.ctx.Err(), "(%T) stream is stopped by context", p)
case v, ok := <-p.ch:
if !ok {
if _, rng := p.rangeTraverser.Next(); rng.GetLength() != 0 {
return nil, errors.Errorf("(%T) incomplete get payload range", p)
}
return nil, io.EOF
}
return &Response{
chunk: v,
}, nil
}
}
func (p *streamer) switchToObject(id *object.ID) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", p)
}
// get container to read payload range
cnr, err := p.cnrSrc.Get(p.prm.addr.ContainerID())
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 4)
// add common options
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
// set success count (1st incoming full range)
placement.SuccessAfter(1),
// set identifier of the processing object
placement.ForObject(id),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if p.prm.common.LocalOnly() {
// use local-only placement builder
builder = svcutil.NewLocalPlacement(builder, p.localAddrSrc)
}
// set placement builder
traverseOpts = append(traverseOpts, placement.UseBuilder(builder))
// build placement traverser
if p.traverser, err = placement.NewTraverser(traverseOpts...); err != nil {
return errors.Wrapf(err, "(%T) could not build placement traverser", p)
}
return nil
}
func (p *streamer) start() {
defer close(p.ch)
objAddr := object.NewAddress()
objAddr.SetContainerID(p.prm.addr.ContainerID())
loop:
for {
select {
case <-p.ctx.Done():
// TODO: log this
break loop
default:
}
nextID, nextRange := p.rangeTraverser.Next()
if nextRange.GetLength() == 0 {
break
} else if err := p.switchToObject(nextID); err != nil {
// TODO: log error
break
}
objAddr.SetObjectID(nextID)
subloop:
for {
select {
case <-p.ctx.Done():
// TODO: log this
break loop
default:
}
addrs := p.traverser.Next()
if len(addrs) == 0 {
break
}
for i := range addrs {
wg := new(sync.WaitGroup)
wg.Add(1)
addr := addrs[i]
if err := p.workerPool.Submit(func() {
defer wg.Done()
var rngWriter io.WriterTo
if network.IsLocalAddress(p.localAddrSrc, addr) {
rngWriter = &localRangeWriter{
addr: objAddr,
rng: nextRange,
storage: p.localStore,
}
} else {
rngWriter = &remoteRangeWriter{
ctx: p.ctx,
keyStorage: p.keyStorage,
node: addr,
token: p.prm.common.SessionToken(),
bearer: p.prm.common.BearerToken(),
addr: objAddr,
rng: nextRange,
clientCache: p.clientCache,
clientOpts: p.clientOpts,
}
}
written, err := rngWriter.WriteTo(&chunkWriter{
ctx: p.ctx,
ch: p.ch,
})
if err != nil {
svcutil.LogServiceError(p.log, "RANGE", addr, err)
}
ln := nextRange.GetLength()
uw := uint64(written)
p.rangeTraverser.PushSuccessSize(uw)
nextRange.SetLength(ln - uw)
nextRange.SetOffset(nextRange.GetOffset() + uw)
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(p.log, "RANGE", err)
break loop
}
wg.Wait()
if nextRange.GetLength() == 0 {
p.traverser.SubmitSuccess()
break subloop
}
}
}
if !p.traverser.Success() {
// TODO: log error
break loop
}
}
}
func (w *chunkWriter) Write(p []byte) (int, error) {
select {
case <-w.ctx.Done():
return 0, w.ctx.Err()
case w.ch <- p:
}
w.written += uint64(len(p))
return len(p), nil
}

View file

@ -1,50 +0,0 @@
package rangesvc
import (
"context"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
"github.com/pkg/errors"
)
// Service implements GetRange operation of Object service v2.
type Service struct {
*cfg
}
// Option represents Service constructor option.
type Option func(*cfg)
type cfg struct {
svc *rangesvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(opts ...Option) *Service {
c := new(cfg)
for i := range opts {
opts[i](c)
}
return &Service{
cfg: c,
}
}
// GetRange calls internal service and returns v2 object payload range stream.
func (s *Service) GetRange(ctx context.Context, req *objectV2.GetRangeRequest) (objectV2.GetRangeObjectStreamer, error) {
r, err := s.svc.GetRange(ctx, toPrm(req))
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not get object payload range data", s)
}
return fromResponse(r.Stream()), nil
}
func WithInternalService(v *rangesvc.Service) Option {
return func(c *cfg) {
c.svc = v
}
}

View file

@ -1,27 +0,0 @@
package rangesvc
import (
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
"github.com/pkg/errors"
)
type streamer struct {
stream rangesvc.Streamer
body *objectV2.GetRangeResponseBody
}
func (s *streamer) Recv() (*objectV2.GetRangeResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not read response from stream", s)
}
s.body.SetChunk(r.PayloadChunk())
resp := new(objectV2.GetRangeResponse)
resp.SetBody(s.body)
return resp, nil
}

View file

@ -1,26 +0,0 @@
package rangesvc
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
)
func toPrm(req *objectV2.GetRangeRequest) *rangesvc.Prm {
body := req.GetBody()
return new(rangesvc.Prm).
WithAddress(
object.NewAddressFromV2(body.GetAddress()),
).
WithRange(object.NewRangeFromV2(body.GetRange())).
WithCommonPrm(util.CommonPrmFromV2(req))
}
func fromResponse(stream rangesvc.Streamer) objectV2.GetRangeObjectStreamer {
return &streamer{
stream: stream,
body: new(objectV2.GetRangeResponseBody),
}
}

View file

@ -4,7 +4,6 @@ import (
"context"
"crypto/sha256"
"fmt"
"io"
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
@ -14,8 +13,8 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
rangesvc "github.com/nspcc-dev/neofs-node/pkg/services/object/range"
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
@ -44,7 +43,7 @@ type cfg struct {
headSvc *headsvc.Service
rangeSvc *rangesvc.Service
rangeSvc *getsvc.Service
clientCache *cache.ClientCache
@ -171,23 +170,15 @@ func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.Ra
if prm.typ == pkg.ChecksumSHA256 && nextRng.GetLength() != rng.GetLength() {
// here we cannot receive SHA256 checksum through GetRangeHash service
// since SHA256 is not homomorphic
res, err := s.rangeSvc.GetRange(ctx, new(rangesvc.Prm).
WithAddress(addr).
WithRange(nextRng).
WithCommonPrm(prm.common),
)
rngPrm := getsvc.RangePrm{}
rngPrm.SetRange(nextRng)
rngPrm.WithAddress(addr)
rngPrm.SetChunkWriter(hasher)
err := s.rangeSvc.GetRange(ctx, rngPrm)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive payload range for %v checksum", s, prm.typ)
}
for stream := res.Stream(); ; {
resp, err := stream.Recv()
if errors.Is(errors.Cause(err), io.EOF) {
break
}
hasher.add(resp.PayloadChunk())
}
} else {
resp, err := (&distributedHasher{
cfg: s.cfg,
@ -206,7 +197,7 @@ func (s *Service) getHashes(ctx context.Context, prm *Prm, traverser *objutil.Ra
return nil, errors.Errorf("(%T) unexpected %v hashes amount %d", s, prm.typ, ln)
}
hasher.add(hs[0])
_ = hasher.WriteChunk(hs[0])
}
traverser.PushSuccessSize(nextRng.GetLength())
@ -265,7 +256,7 @@ func WithHeadService(v *headsvc.Service) Option {
}
}
func WithRangeService(v *rangesvc.Service) Option {
func WithRangeService(v *getsvc.Service) Option {
return func(c *cfg) {
c.rangeSvc = v
}

View file

@ -20,7 +20,7 @@ type onceHashWriter struct {
}
type hasher interface {
add([]byte)
WriteChunk([]byte) error
sum() ([]byte, error)
}
@ -36,8 +36,10 @@ type singleHasher struct {
hash []byte
}
func (h *singleHasher) add(p []byte) {
func (h *singleHasher) WriteChunk(p []byte) error {
h.hash = p
return nil
}
func (h *singleHasher) sum() ([]byte, error) {
@ -52,18 +54,20 @@ func (w *onceHashWriter) write(hs [][]byte) {
})
}
func (h *tzHasher) add(p []byte) {
func (h *tzHasher) WriteChunk(p []byte) error {
h.hashes = append(h.hashes, p)
return
return nil
}
func (h *tzHasher) sum() ([]byte, error) {
return tz.Concat(h.hashes)
}
func (h *commonHasher) add(p []byte) {
func (h *commonHasher) WriteChunk(p []byte) error {
h.h.Write(p)
return nil
}
func (h *commonHasher) sum() ([]byte, error) {

View file

@ -26,7 +26,9 @@ type getStreamResponser struct {
}
type getRangeStreamResponser struct {
stream *response.ServerMessageStreamer
util.ServerStream
respWriter util.ResponseMessageWriter
}
type putStreamResponser struct {
@ -143,35 +145,17 @@ func (s *ResponseService) Delete(ctx context.Context, req *object.DeleteRequest)
return resp.(*object.DeleteResponse), nil
}
func (s *getRangeStreamResponser) Recv() (*object.GetRangeResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not receive response", s)
}
return r.(*object.GetRangeResponse), nil
func (s *getRangeStreamResponser) Send(resp *object.GetRangeResponse) error {
return s.respWriter(resp)
}
func (s *ResponseService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
stream, err := s.respSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest))
if err != nil {
return nil, err
}
return func() (util.ResponseMessage, error) {
return stream.Recv()
}, nil
},
)
if err != nil {
return nil, err
}
return &getRangeStreamResponser{
stream: stream,
}, nil
func (s *ResponseService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
return s.svc.GetRange(req, &getRangeStreamResponser{
ServerStream: stream,
respWriter: s.respSvc.HandleServerStreamRequest_(func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetRangeResponse))
}),
})
}
func (s *ResponseService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {

View file

@ -13,6 +13,12 @@ type GetObjectStream interface {
Send(*object.GetResponse) error
}
// GetObjectRangeStream is an interface of NeoFS API v2 compatible payload range streamer.
type GetObjectRangeStream interface {
util.ServerStream
Send(*object.GetRangeResponse) error
}
// ServiceServer is an interface of utility
// serving v2 Object service.
type ServiceServer interface {
@ -21,6 +27,6 @@ type ServiceServer interface {
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
Search(context.Context, *object.SearchRequest) (object.SearchObjectStreamer, error)
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
GetRange(context.Context, *object.GetRangeRequest) (object.GetRangeObjectStreamer, error)
GetRange(*object.GetRangeRequest, GetObjectRangeStream) error
GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error)
}

View file

@ -32,7 +32,9 @@ type putStreamSigner struct {
}
type getRangeStreamSigner struct {
stream *util.ResponseMessageStreamer
util.ServerStream
respWriter util.ResponseMessageWriter
}
func NewSignService(key *ecdsa.PrivateKey, svc ServiceServer) *SignService {
@ -151,35 +153,24 @@ func (s *SignService) Delete(ctx context.Context, req *object.DeleteRequest) (*o
return resp.(*object.DeleteResponse), nil
}
func (s *getRangeStreamSigner) Recv() (*object.GetRangeResponse, error) {
r, err := s.stream.Recv()
if err != nil {
return nil, errors.Wrap(err, "could not receive response")
}
return r.(*object.GetRangeResponse), nil
func (s *getRangeStreamSigner) Send(resp *object.GetRangeResponse) error {
return s.respWriter(resp)
}
func (s *SignService) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
stream, err := s.sigSvc.HandleServerStreamRequest(ctx, req,
func(ctx context.Context, req interface{}) (util.ResponseMessageReader, error) {
stream, err := s.svc.GetRange(ctx, req.(*object.GetRangeRequest))
if err != nil {
return nil, err
}
return func() (util.ResponseMessage, error) {
return stream.Recv()
}, nil
func (s *SignService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
respWriter, err := s.sigSvc.HandleServerStreamRequest_(req,
func(resp util.ResponseMessage) error {
return stream.Send(resp.(*object.GetRangeResponse))
},
)
if err != nil {
return nil, err
return err
}
return &getRangeStreamSigner{
stream: stream,
}, nil
return s.svc.GetRange(req, &getRangeStreamSigner{
ServerStream: stream,
respWriter: respWriter,
})
}
func (s *SignService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {

View file

@ -37,10 +37,11 @@ type (
addrAmount uint64
}
rangeStreamBasicChecker struct {
next object.GetRangeObjectStreamer
buf *bytes.Buffer
resp *object.GetRangeResponse
rangeStreamMsgSizeCtrl struct {
util.ServerStream
stream GetObjectRangeStream
chunkSize int
}
)
@ -112,46 +113,47 @@ func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteReq
return c.next.Delete(ctx, request)
}
func (c TransportSplitter) GetRange(ctx context.Context, request *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) {
stream, err := c.next.GetRange(ctx, request)
func (s *rangeStreamMsgSizeCtrl) Send(resp *object.GetRangeResponse) error {
body := resp.GetBody()
return &rangeStreamBasicChecker{
next: stream,
chunkSize: int(c.chunkSize),
}, err
chunkPart, ok := body.GetRangePart().(*object.GetRangePartChunk)
if !ok {
return s.stream.Send(resp)
}
var newResp *object.GetRangeResponse
for buf := bytes.NewBuffer(chunkPart.GetChunk()); buf.Len() > 0; {
if newResp == nil {
newResp = new(object.GetRangeResponse)
newResp.SetBody(body)
}
chunkPart.SetChunk(buf.Next(s.chunkSize))
body.SetRangePart(chunkPart)
newResp.SetMetaHeader(resp.GetMetaHeader())
newResp.SetVerificationHeader(resp.GetVerificationHeader())
if err := s.stream.Send(newResp); err != nil {
return err
}
}
return nil
}
func (c TransportSplitter) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
return c.next.GetRange(req, &rangeStreamMsgSizeCtrl{
ServerStream: stream,
stream: stream,
chunkSize: int(c.chunkSize),
})
}
func (c TransportSplitter) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
return c.next.GetRangeHash(ctx, request)
}
func (r *rangeStreamBasicChecker) Recv() (*object.GetRangeResponse, error) {
if r.resp == nil {
resp, err := r.next.Recv()
if err != nil {
return resp, err
}
r.resp = resp
r.buf = bytes.NewBuffer(resp.GetBody().GetChunk())
}
body := new(object.GetRangeResponseBody)
body.SetChunk(r.buf.Next(r.chunkSize))
resp := new(object.GetRangeResponse)
resp.SetVerificationHeader(r.resp.GetVerificationHeader())
resp.SetMetaHeader(r.resp.GetMetaHeader())
resp.SetBody(body)
if r.buf.Len() == 0 {
r.buf = nil
r.resp = nil
}
return resp, nil
}
func (s *searchStreamBasicChecker) Recv() (*object.SearchResponse, error) {
if s.resp == nil {
resp, err := s.next.Recv()