[#239] object/head: Implement new service processing

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-12-09 13:32:33 +03:00 committed by Alex Vanin
parent 9dd83bdf0d
commit 0e1f05ff45
11 changed files with 348 additions and 140 deletions

View file

@ -25,7 +25,6 @@ import (
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
getsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/get/v2"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
headsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/head/v2"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
@ -44,8 +43,6 @@ type objectSvc struct {
search *searchsvcV2.Service
head *headsvcV2.Service
get *getsvcV2.Service
delete *deletesvcV2.Service
@ -140,7 +137,7 @@ func (s *objectSvc) Put(ctx context.Context) (object.PutObjectStreamer, error) {
}
func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
return s.head.Head(ctx, req)
return s.get.Head(ctx, req)
}
func (s *objectSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) {
@ -342,15 +339,10 @@ func initObjectService(c *cfg) {
),
)
sHeadV2 := headsvcV2.NewService(
headsvcV2.WithInternalService(sHead),
)
sGet := getsvc.New(
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientCache(clientCache),
getsvc.WithHeadService(sHead),
getsvc.WithClientOptions(
client.WithDialTimeout(c.viper.GetDuration(cfgObjectGetDialTimeout)),
),
@ -403,7 +395,6 @@ func initObjectService(c *cfg) {
&objectSvc{
put: sPutV2,
search: sSearchV2,
head: sHeadV2,
get: sGetV2,
delete: sDeleteV2,
},

View file

@ -35,8 +35,12 @@ type execCtx struct {
collectedObject *object.Object
curOff uint64
head bool
}
type execOption func(*execCtx)
const (
statusUndefined int = iota
statusOK
@ -45,9 +49,23 @@ const (
statusOutOfRange
)
func headOnly() execOption {
return func(c *execCtx) {
c.head = true
}
}
func withPayloadRange(r *objectSDK.Range) execOption {
return func(c *execCtx) {
c.prm.rng = r
}
}
func (exec *execCtx) setLogger(l *logger.Logger) {
req := "GET"
if exec.ctxRange() != nil {
if exec.headOnly() {
req = "HEAD"
} else if exec.ctxRange() != nil {
req = "GET_RANGE"
}
@ -92,7 +110,7 @@ func (exec execCtx) remotePrm() *client.GetObjectParams {
}
func (exec *execCtx) canAssemble() bool {
return exec.svc.assembly && !exec.isRaw()
return exec.svc.assembly && !exec.isRaw() && !exec.headOnly()
}
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
@ -107,6 +125,10 @@ func (exec *execCtx) ctxRange() *objectSDK.Range {
return exec.prm.rng
}
func (exec *execCtx) headOnly() bool {
return exec.head
}
func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr)
@ -126,7 +148,7 @@ func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Trav
}
func (exec *execCtx) getChild(id *objectSDK.ID, rng *objectSDK.Range) (*object.Object, bool) {
w := newSimpleObjectWriter()
w := NewSimpleObjectWriter()
p := exec.prm
p.common = p.common.WithLocalOnly(false)
@ -139,9 +161,9 @@ func (exec *execCtx) getChild(id *objectSDK.ID, rng *objectSDK.Range) (*object.O
p.WithAddress(addr)
exec.statusError = exec.svc.get(exec.context(), p)
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
return w.object(), exec.status == statusOK
return w.Object(), exec.status == statusOK
}
func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
@ -153,9 +175,14 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
p.common = p.common.WithLocalOnly(false)
p.WithAddress(childAddr)
header, err := exec.svc.headSvc.head(exec.context(), Prm{
prm := HeadPrm{
commonPrm: p.commonPrm,
})
}
w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w)
err := exec.svc.Head(exec.context(), prm)
switch {
default:
@ -172,7 +199,7 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
exec.status = statusOK
exec.err = nil
return header, true
return w.Object(), true
}
}
@ -244,6 +271,10 @@ func (exec *execCtx) writeCollectedHeader() bool {
}
func (exec *execCtx) writeObjectPayload(obj *object.Object) bool {
if exec.headOnly() {
return true
}
err := exec.prm.objWriter.WriteChunk(obj.Payload())
switch {

View file

@ -9,14 +9,12 @@ import (
// Get serves a request to get an object by address, and returns Streamer instance.
func (s *Service) Get(ctx context.Context, prm Prm) error {
return s.get(ctx, RangePrm{
commonPrm: prm.commonPrm,
}).err
return s.get(ctx, prm.commonPrm).err
}
// GetRange serves a request to get an object by address, and returns Streamer instance.
func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
return s.get(ctx, prm).err
return s.get(ctx, prm.commonPrm, withPayloadRange(prm.rng)).err
}
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
@ -50,14 +48,28 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
}, nil
}
func (s *Service) get(ctx context.Context, prm RangePrm) statusError {
// Head reads object header from container.
//
// Returns ErrNotFound if the header was not received for the call.
// Returns SplitInfoError if object is virtual and raw flag is set.
func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
return s.get(ctx, prm.commonPrm, headOnly()).err
}
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
exec := &execCtx{
svc: s,
ctx: ctx,
prm: prm,
prm: RangePrm{
commonPrm: prm,
},
infoSplit: objectSDK.NewSplitInfo(),
}
for i := range opts {
opts[i](exec)
}
exec.setLogger(s.log)
exec.execute()

View file

@ -93,8 +93,8 @@ func newTestClient() *testClient {
}
}
func (c *testClient) GetObject(_ context.Context, p RangePrm) (*objectSDK.Object, error) {
v, ok := c.results[p.Address().String()]
func (c *testClient) getObject(exec *execCtx) (*objectSDK.Object, error) {
v, ok := c.results[exec.address().String()]
if !ok {
return nil, object.ErrNotFound
}
@ -103,7 +103,7 @@ func (c *testClient) GetObject(_ context.Context, p RangePrm) (*objectSDK.Object
return nil, v.err
}
return cutToRange(v.obj.Object(), p.rng).SDK(), nil
return cutToRange(v.obj.Object(), exec.ctxRange()).SDK(), nil
}
func (c *testClient) head(_ context.Context, p Prm) (*object.Object, error) {
@ -126,11 +126,11 @@ func (c *testClient) addResult(addr *objectSDK.Address, obj *object.RawObject, e
}{obj: obj, err: err}
}
func (s *testStorage) Get(p RangePrm) (*object.Object, error) {
func (s *testStorage) get(exec *execCtx) (*object.Object, error) {
var (
ok bool
obj *object.Object
sAddr = p.Address().String()
sAddr = exec.address().String()
)
if _, ok = s.inhumed[sAddr]; ok {
@ -142,7 +142,7 @@ func (s *testStorage) Get(p RangePrm) (*object.Object, error) {
}
if obj, ok = s.phy[sAddr]; ok {
return cutToRange(obj, p.rng), nil
return cutToRange(obj, exec.ctxRange()), nil
}
return nil, object.ErrNotFound
@ -207,6 +207,7 @@ func generateObject(addr *objectSDK.Address, prev *objectSDK.ID, payload []byte,
obj.SetContainerID(addr.ContainerID())
obj.SetID(addr.ObjectID())
obj.SetPayload(payload)
obj.SetPayloadSize(uint64(len(payload)))
obj.SetPreviousID(prev)
obj.SetChildren(children...)
@ -249,11 +250,20 @@ func TestGetLocalOnly(t *testing.T) {
return p
}
newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm {
p := HeadPrm{}
p.SetHeaderWriter(w)
p.WithRawFlag(raw)
p.common = new(util.CommonPrm).WithLocalOnly(true)
return p
}
t.Run("OK", func(t *testing.T) {
storage := newTestStorage()
svc := newSvc(storage)
w := newSimpleObjectWriter()
w := NewSimpleObjectWriter()
p := newPrm(false, w)
payloadSz := uint64(10)
@ -273,16 +283,24 @@ func TestGetLocalOnly(t *testing.T) {
require.NoError(t, err)
require.Equal(t, obj.Object(), w.object())
require.Equal(t, obj.Object(), w.Object())
w = newSimpleObjectWriter()
w = NewSimpleObjectWriter()
rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.object().Payload())
require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.Object().Payload())
w = NewSimpleObjectWriter()
headPrm := newHeadPrm(false, w)
headPrm.WithAddress(addr)
err = svc.Head(ctx, headPrm)
require.NoError(t, err)
require.Equal(t, obj.CutPayload().Object(), w.Object())
})
t.Run("INHUMED", func(t *testing.T) {
@ -306,6 +324,12 @@ func TestGetLocalOnly(t *testing.T) {
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
headPrm := newHeadPrm(false, nil)
headPrm.WithAddress(addr)
err = svc.Head(ctx, headPrm)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
})
t.Run("404", func(t *testing.T) {
@ -328,6 +352,12 @@ func TestGetLocalOnly(t *testing.T) {
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
headPrm := newHeadPrm(false, nil)
headPrm.WithAddress(addr)
err = svc.Head(ctx, headPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("VIRTUAL", func(t *testing.T) {
@ -361,6 +391,13 @@ func TestGetLocalOnly(t *testing.T) {
err = svc.Get(ctx, p)
require.True(t, errors.As(err, &errSplit))
headPrm := newHeadPrm(true, nil)
headPrm.WithAddress(addr)
err = svc.Head(ctx, headPrm)
require.True(t, errors.As(err, &errSplit))
require.Equal(t, splitInfo, errSplit.SplitInfo())
})
}
@ -440,7 +477,7 @@ func TestGetRemoteSmall(t *testing.T) {
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(true)
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
svc.assembly = true
svc.traverserGenerator = &testTraverserGenerator{
@ -476,6 +513,15 @@ func TestGetRemoteSmall(t *testing.T) {
return p
}
newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm {
p := HeadPrm{}
p.SetHeaderWriter(w)
p.WithRawFlag(raw)
p.common = new(util.CommonPrm).WithLocalOnly(false)
return p
}
t.Run("OK", func(t *testing.T) {
addr := generateAddress()
addr.SetContainerID(cid)
@ -507,28 +553,36 @@ func TestGetRemoteSmall(t *testing.T) {
},
})
w := newSimpleObjectWriter()
w := NewSimpleObjectWriter()
p := newPrm(false, w)
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj.Object(), w.object())
require.Equal(t, obj.Object(), w.Object())
*c1, *c2 = *c2, *c1
err = svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj.Object(), w.object())
require.Equal(t, obj.Object(), w.Object())
w = newSimpleObjectWriter()
w = NewSimpleObjectWriter()
rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.object().Payload())
require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.Object().Payload())
w = NewSimpleObjectWriter()
headPrm := newHeadPrm(false, w)
headPrm.WithAddress(addr)
err = svc.Head(ctx, headPrm)
require.NoError(t, err)
require.Equal(t, obj.CutPayload().Object(), w.Object())
})
t.Run("INHUMED", func(t *testing.T) {
@ -567,6 +621,12 @@ func TestGetRemoteSmall(t *testing.T) {
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
headPrm := newHeadPrm(false, nil)
headPrm.WithAddress(addr)
err = svc.Head(ctx, headPrm)
require.True(t, errors.Is(err, object.ErrAlreadyRemoved))
})
t.Run("404", func(t *testing.T) {
@ -605,9 +665,26 @@ func TestGetRemoteSmall(t *testing.T) {
err = svc.GetRange(ctx, rngPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
headPrm := newHeadPrm(false, nil)
headPrm.WithAddress(addr)
err = svc.Head(ctx, headPrm)
require.True(t, errors.Is(err, object.ErrNotFound))
})
t.Run("VIRTUAL", func(t *testing.T) {
testHeadVirtual := func(svc *Service, addr *objectSDK.Address, i *objectSDK.SplitInfo) {
headPrm := newHeadPrm(false, nil)
headPrm.WithAddress(addr)
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())
err := svc.Head(ctx, headPrm)
require.True(t, errors.As(err, &errSplit))
require.Equal(t, i, errSplit.SplitInfo())
}
t.Run("linking", func(t *testing.T) {
t.Run("get linking failure", func(t *testing.T) {
addr := generateAddress()
@ -645,6 +722,8 @@ func TestGetRemoteSmall(t *testing.T) {
},
})
testHeadVirtual(svc, addr, splitInfo)
p := newPrm(false, nil)
p.WithAddress(addr)
@ -717,15 +796,15 @@ func TestGetRemoteSmall(t *testing.T) {
},
})
p := newPrm(false, newSimpleObjectWriter())
testHeadVirtual(svc, addr, splitInfo)
p := newPrm(false, NewSimpleObjectWriter())
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
svc.headSvc = c2
rngPrm := newRngPrm(false, newSimpleObjectWriter(), 0, 1)
rngPrm := newRngPrm(false, NewSimpleObjectWriter(), 0, 1)
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
@ -792,18 +871,18 @@ func TestGetRemoteSmall(t *testing.T) {
},
})
w := newSimpleObjectWriter()
testHeadVirtual(svc, addr, splitInfo)
w := NewSimpleObjectWriter()
p := newPrm(false, w)
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, srcObj.Object(), w.object())
require.Equal(t, srcObj.Object(), w.Object())
svc.headSvc = c2
w = newSimpleObjectWriter()
w = NewSimpleObjectWriter()
payloadSz := srcObj.PayloadSize()
off := payloadSz / 3
@ -814,7 +893,7 @@ func TestGetRemoteSmall(t *testing.T) {
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[off:off+ln], w.object().Payload())
require.Equal(t, payload[off:off+ln], w.Object().Payload())
})
})
@ -855,6 +934,8 @@ func TestGetRemoteSmall(t *testing.T) {
},
})
testHeadVirtual(svc, addr, splitInfo)
p := newPrm(false, nil)
p.WithAddress(addr)
@ -917,11 +998,12 @@ func TestGetRemoteSmall(t *testing.T) {
},
})
testHeadVirtual(svc, addr, splitInfo)
headSvc := newTestClient()
headSvc.addResult(preRightAddr, nil, object.ErrNotFound)
svc.headSvc = headSvc
p := newPrm(false, newSimpleObjectWriter())
p := newPrm(false, NewSimpleObjectWriter())
p.WithAddress(addr)
err := svc.Get(ctx, p)
@ -987,18 +1069,18 @@ func TestGetRemoteSmall(t *testing.T) {
},
})
svc.headSvc = c2
testHeadVirtual(svc, addr, splitInfo)
w := newSimpleObjectWriter()
w := NewSimpleObjectWriter()
p := newPrm(false, w)
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, srcObj.Object(), w.object())
require.Equal(t, srcObj.Object(), w.Object())
w = newSimpleObjectWriter()
w = NewSimpleObjectWriter()
payloadSz := srcObj.PayloadSize()
off := payloadSz / 3
@ -1009,9 +1091,9 @@ func TestGetRemoteSmall(t *testing.T) {
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[off:off+ln], w.object().Payload())
require.Equal(t, payload[off:off+ln], w.Object().Payload())
w = newSimpleObjectWriter()
w = NewSimpleObjectWriter()
off = payloadSz - 2
ln = 1
@ -1020,7 +1102,7 @@ func TestGetRemoteSmall(t *testing.T) {
err = svc.GetRange(ctx, rngPrm)
require.NoError(t, err)
require.Equal(t, payload[off:off+ln], w.object().Payload())
require.Equal(t, payload[off:off+ln], w.Object().Payload())
})
})
})

View file

@ -10,7 +10,7 @@ import (
func (exec *execCtx) executeLocal() {
var err error
exec.collectedObject, err = exec.svc.localStorage.Get(exec.prm)
exec.collectedObject, err = exec.svc.localStorage.get(exec)
var errSplitInfo *objectSDK.SplitInfoError

View file

@ -31,6 +31,11 @@ type RangeHashPrm struct {
rngs []*objectSDK.Range
}
// HeadPrm groups parameters of Head service call.
type HeadPrm struct {
commonPrm
}
type commonPrm struct {
objWriter ObjectWriter
@ -50,9 +55,15 @@ type ChunkWriter interface {
WriteChunk([]byte) error
}
// HeaderWriter is an interface of target component
// to write object header.
type HeaderWriter interface {
WriteHeader(*object.Object) error
}
// ObjectWriter is an interface of target component to write object.
type ObjectWriter interface {
WriteHeader(*object.Object) error
HeaderWriter
ChunkWriter
}
@ -71,9 +82,9 @@ func (p *commonPrm) SetRemoteCallOptions(opts ...client.CallOption) {
p.callOpts = opts
}
// SetObjectWriter sets target component to write the object payload range.
// SetChunkWriter sets target component to write the object payload range.
func (p *RangePrm) SetChunkWriter(w ChunkWriter) {
p.objWriter = &rangeWriter{
p.objWriter = &partWriter{
chunkWriter: w,
}
}
@ -97,3 +108,10 @@ func (p *RangeHashPrm) SetHashGenerator(v func() hash.Hash) {
func (p *commonPrm) SetCommonParameters(common *util.CommonPrm) {
p.common = common
}
// SetHeaderWriter sets target component to write the object header.
func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) {
p.objWriter = &partWriter{
headWriter: w,
}
}

View file

@ -20,7 +20,7 @@ func (exec *execCtx) processNode(ctx context.Context, addr *network.Address) boo
return true
}
obj, err := client.GetObject(ctx, exec.prm)
obj, err := client.getObject(exec)
var errSplitInfo *objectSDK.SplitInfoError

View file

@ -1,7 +1,6 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
@ -9,7 +8,6 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
@ -25,7 +23,7 @@ type Service struct {
type Option func(*cfg)
type getClient interface {
GetObject(context.Context, RangePrm) (*objectSDK.Object, error)
getObject(*execCtx) (*objectSDK.Object, error)
}
type cfg struct {
@ -33,12 +31,8 @@ type cfg struct {
log *logger.Logger
headSvc interface {
head(context.Context, Prm) (*object.Object, error)
}
localStorage interface {
Get(RangePrm) (*object.Object, error)
get(*execCtx) (*object.Object, error)
}
clientCache interface {
@ -54,7 +48,6 @@ func defaultCfg() *cfg {
return &cfg{
assembly: true,
log: zap.L(),
headSvc: new(headSvcWrapper),
localStorage: new(storageEngineWrapper),
clientCache: new(clientCacheWrapper),
}
@ -117,10 +110,3 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
c.traverserGenerator = t
}
}
// WithHeadService returns option to set the utility serving object.Head.
func WithHeadService(svc *headsvc.Service) Option {
return func(c *cfg) {
c.headSvc.(*headSvcWrapper).svc = svc
}
}

View file

@ -1,7 +1,6 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"hash"
@ -10,10 +9,9 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
)
type simpleObjectWriter struct {
type SimpleObjectWriter struct {
obj *object.RawObject
pld []byte
@ -33,13 +31,11 @@ type storageEngineWrapper struct {
engine *engine.StorageEngine
}
type headSvcWrapper struct {
svc *headsvc.Service
}
type rangeWriter struct {
type partWriter struct {
ObjectWriter
headWriter HeaderWriter
chunkWriter ChunkWriter
}
@ -47,13 +43,13 @@ type hasherWrapper struct {
hash hash.Hash
}
func newSimpleObjectWriter() *simpleObjectWriter {
return &simpleObjectWriter{
func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{
obj: object.NewRaw(),
}
}
func (s *simpleObjectWriter) WriteHeader(obj *object.Object) error {
func (s *SimpleObjectWriter) WriteHeader(obj *object.Object) error {
s.obj = object.NewRawFromObject(obj)
s.pld = make([]byte, 0, obj.PayloadSize())
@ -61,16 +57,12 @@ func (s *simpleObjectWriter) WriteHeader(obj *object.Object) error {
return nil
}
func (s *simpleObjectWriter) WriteChunk(p []byte) error {
func (s *SimpleObjectWriter) WriteChunk(p []byte) error {
s.pld = append(s.pld, p...)
return nil
}
func (s *simpleObjectWriter) Close() error {
return nil
}
func (s *simpleObjectWriter) object() *object.Object {
func (s *SimpleObjectWriter) Object() *object.Object {
if len(s.pld) > 0 {
s.obj.SetPayload(s.pld)
}
@ -86,16 +78,24 @@ func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (getClient,
}, err
}
func (c *clientWrapper) GetObject(ctx context.Context, p RangePrm) (*objectSDK.Object, error) {
func (c *clientWrapper) getObject(exec *execCtx) (*objectSDK.Object, error) {
if exec.headOnly() {
return c.client.GetObjectHeader(exec.context(),
new(client.ObjectHeaderParams).
WithAddress(exec.address()).
WithRawFlag(exec.isRaw()),
exec.callOptions()...,
)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if p.rng != nil {
data, err := c.client.ObjectPayloadRangeData(ctx,
if rng := exec.ctxRange(); rng != nil {
data, err := c.client.ObjectPayloadRangeData(exec.context(),
new(client.RangeDataParams).
WithAddress(p.Address()).
WithRange(p.rng).
WithRaw(p.RawFlag()),
p.callOpts...,
WithAddress(exec.address()).
WithRange(rng).
WithRaw(exec.isRaw()),
exec.callOptions()...,
)
if err != nil {
return nil, err
@ -103,22 +103,30 @@ func (c *clientWrapper) GetObject(ctx context.Context, p RangePrm) (*objectSDK.O
return payloadOnlyObject(data), nil
} else {
// we don't specify payload writer because we accumulate
// the object locally (even huge).
return c.client.GetObject(ctx,
return c.client.GetObject(exec.context(),
new(client.GetObjectParams).
WithAddress(p.Address()).
WithRawFlag(p.RawFlag()),
p.callOpts...,
WithAddress(exec.address()).
WithRawFlag(exec.isRaw()),
exec.callOptions()...,
)
}
}
func (e *storageEngineWrapper) Get(p RangePrm) (*object.Object, error) {
if p.rng != nil {
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
if exec.headOnly() {
r, err := e.engine.Head(new(engine.HeadPrm).
WithAddress(exec.address()).
WithRaw(exec.isRaw()),
)
if err != nil {
return nil, err
}
return r.Header(), nil
} else if rng := exec.ctxRange(); rng != nil {
r, err := e.engine.GetRange(new(engine.RngPrm).
WithAddress(p.Address()).
WithPayloadRange(p.rng),
WithAddress(exec.address()).
WithPayloadRange(rng),
)
if err != nil {
return nil, err
@ -127,7 +135,7 @@ func (e *storageEngineWrapper) Get(p RangePrm) (*object.Object, error) {
return r.Object(), nil
} else {
r, err := e.engine.Get(new(engine.GetPrm).
WithAddress(p.Address()),
WithAddress(exec.address()),
)
if err != nil {
return nil, err
@ -137,24 +145,14 @@ func (e *storageEngineWrapper) Get(p RangePrm) (*object.Object, error) {
}
}
func (s *headSvcWrapper) head(ctx context.Context, p Prm) (*object.Object, error) {
r, err := s.svc.Head(ctx, new(headsvc.Prm).
WithAddress(p.Address()).
WithCommonPrm(p.common).
Short(false),
)
if err != nil {
return nil, err
}
return r.Header(), nil
}
func (w *rangeWriter) WriteChunk(p []byte) error {
func (w *partWriter) WriteChunk(p []byte) error {
return w.chunkWriter.WriteChunk(p)
}
func (w *partWriter) WriteHeader(o *object.Object) error {
return w.headWriter.WriteHeader(o)
}
func payloadOnlyObject(payload []byte) *objectSDK.Object {
rawObj := object.NewRaw()
rawObj.SetPayload(payload)

View file

@ -91,6 +91,28 @@ func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRe
return toHashResponse(req.GetBody().GetType(), res), nil
}
// Head serves NeoFS API v2 compatible HEAD requests.
func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
resp := new(objectV2.HeadResponse)
resp.SetBody(new(objectV2.HeadResponseBody))
p, err := s.toHeadPrm(req, resp)
if err != nil {
return nil, err
}
err = s.svc.Head(ctx, *p)
var splitErr *object.SplitInfoError
if errors.As(err, &splitErr) {
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
err = nil
}
return resp, err
}
func WithInternalService(v *getsvc.Service) Option {
return func(c *cfg) {
c.svc = v

View file

@ -6,11 +6,12 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/refs"
"github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
@ -30,7 +31,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
p.SetPrivateKey(key)
body := req.GetBody()
p.WithAddress(object.NewAddressFromV2(body.GetAddress()))
p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress()))
p.WithRawFlag(body.GetRaw())
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
p.SetObjectWriter(&streamObjectWriter{stream})
@ -53,11 +54,11 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
p.SetPrivateKey(key)
body := req.GetBody()
p.WithAddress(object.NewAddressFromV2(body.GetAddress()))
p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress()))
p.WithRawFlag(body.GetRaw())
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
p.SetChunkWriter(&streamObjectRangeWriter{stream})
p.SetRange(object.NewRangeFromV2(body.GetRange()))
p.SetRange(objectSDK.NewRangeFromV2(body.GetRange()))
p.SetCommonParameters(commonParameters(meta))
return p, nil
@ -75,15 +76,15 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
p.SetPrivateKey(key)
body := req.GetBody()
p.WithAddress(object.NewAddressFromV2(body.GetAddress()))
p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress()))
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
p.SetCommonParameters(commonParameters(meta))
rngsV2 := body.GetRanges()
rngs := make([]*object.Range, 0, len(rngsV2))
rngs := make([]*objectSDK.Range, 0, len(rngsV2))
for i := range rngsV2 {
rngs = append(rngs, object.NewRangeFromV2(rngsV2[i]))
rngs = append(rngs, objectSDK.NewRangeFromV2(rngsV2[i]))
}
p.SetRangeList(rngs)
@ -104,6 +105,46 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
return p, nil
}
type headResponseWriter struct {
mainOnly bool
body *objectV2.HeadResponseBody
}
func (w *headResponseWriter) WriteHeader(hdr *object.Object) error {
if w.mainOnly {
w.body.SetHeaderPart(toShortObjectHeader(hdr))
} else {
w.body.SetHeaderPart(toFullObjectHeader(hdr))
}
return nil
}
func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
meta := req.GetMetaHeader()
key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken()))
if err != nil {
return nil, err
}
p := new(getsvc.HeadPrm)
p.SetPrivateKey(key)
body := req.GetBody()
p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress()))
p.WithRawFlag(body.GetRaw())
p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...)
p.SetHeaderWriter(&headResponseWriter{
mainOnly: body.GetMainOnly(),
body: resp.GetBody(),
})
p.SetCommonParameters(commonParameters(meta))
return p, nil
}
// can be shared accross all services
func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption {
xHdrs := meta.GetXHeaders()
@ -128,7 +169,7 @@ func commonParameters(meta *session.RequestMetaHeader) *util.CommonPrm {
WithLocalOnly(meta.GetTTL() <= 1)
}
func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse {
func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse {
resp := new(objectV2.GetResponse)
body := new(objectV2.GetResponseBody)
@ -139,7 +180,7 @@ func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse {
return resp
}
func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse {
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
resp := new(objectV2.GetRangeResponse)
body := new(objectV2.GetRangeResponseBody)
@ -150,6 +191,10 @@ func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse {
return resp
}
func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResponse) {
resp.GetBody().SetHeaderPart(info.ToV2())
}
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
resp := new(objectV2.GetRangeHashResponse)
@ -161,3 +206,26 @@ func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.G
return resp
}
func toFullObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
obj := hdr.ToV2()
hs := new(objectV2.HeaderWithSignature)
hs.SetHeader(obj.GetHeader())
hs.SetSignature(obj.GetSignature())
return hs
}
func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
hdrV2 := hdr.ToV2().GetHeader()
sh := new(objectV2.ShortHeader)
sh.SetOwnerID(hdrV2.GetOwnerID())
sh.SetCreationEpoch(hdrV2.GetCreationEpoch())
sh.SetPayloadLength(hdrV2.GetPayloadLength())
sh.SetVersion(hdrV2.GetVersion())
sh.SetObjectType(hdrV2.GetObjectType())
return sh
}