From 0e1f05ff457ef6b5d213f49c7f95d7b63d785ecb Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 9 Dec 2020 13:32:33 +0300 Subject: [PATCH] [#239] object/head: Implement new service processing Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 11 +- pkg/services/object/get/exec.go | 47 ++++++-- pkg/services/object/get/get.go | 28 +++-- pkg/services/object/get/get_test.go | 152 ++++++++++++++++++++------ pkg/services/object/get/local.go | 2 +- pkg/services/object/get/prm.go | 24 +++- pkg/services/object/get/remote.go | 2 +- pkg/services/object/get/service.go | 18 +-- pkg/services/object/get/util.go | 96 ++++++++-------- pkg/services/object/get/v2/service.go | 22 ++++ pkg/services/object/get/v2/util.go | 86 +++++++++++++-- 11 files changed, 348 insertions(+), 140 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index eba8ca713..fc9e27044 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -25,7 +25,6 @@ import ( getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" getsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/get/v2" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" - headsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/head/v2" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" @@ -44,8 +43,6 @@ type objectSvc struct { search *searchsvcV2.Service - head *headsvcV2.Service - get *getsvcV2.Service delete *deletesvcV2.Service @@ -140,7 +137,7 @@ func (s *objectSvc) Put(ctx context.Context) (object.PutObjectStreamer, error) { } func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { - return s.head.Head(ctx, req) + return s.get.Head(ctx, req) } func (s *objectSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { @@ -342,15 +339,10 @@ func initObjectService(c *cfg) { ), ) - sHeadV2 := headsvcV2.NewService( - headsvcV2.WithInternalService(sHead), - ) - sGet := getsvc.New( getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), getsvc.WithClientCache(clientCache), - getsvc.WithHeadService(sHead), getsvc.WithClientOptions( client.WithDialTimeout(c.viper.GetDuration(cfgObjectGetDialTimeout)), ), @@ -403,7 +395,6 @@ func initObjectService(c *cfg) { &objectSvc{ put: sPutV2, search: sSearchV2, - head: sHeadV2, get: sGetV2, delete: sDeleteV2, }, diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 439d382be..fe96ac394 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -35,8 +35,12 @@ type execCtx struct { collectedObject *object.Object curOff uint64 + + head bool } +type execOption func(*execCtx) + const ( statusUndefined int = iota statusOK @@ -45,9 +49,23 @@ const ( statusOutOfRange ) +func headOnly() execOption { + return func(c *execCtx) { + c.head = true + } +} + +func withPayloadRange(r *objectSDK.Range) execOption { + return func(c *execCtx) { + c.prm.rng = r + } +} + func (exec *execCtx) setLogger(l *logger.Logger) { req := "GET" - if exec.ctxRange() != nil { + if exec.headOnly() { + req = "HEAD" + } else if exec.ctxRange() != nil { req = "GET_RANGE" } @@ -92,7 +110,7 @@ func (exec execCtx) remotePrm() *client.GetObjectParams { } func (exec *execCtx) canAssemble() bool { - return exec.svc.assembly && !exec.isRaw() + return exec.svc.assembly && !exec.isRaw() && !exec.headOnly() } func (exec *execCtx) splitInfo() *objectSDK.SplitInfo { @@ -107,6 +125,10 @@ func (exec *execCtx) ctxRange() *objectSDK.Range { return exec.prm.rng } +func (exec *execCtx) headOnly() bool { + return exec.head +} + func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) { t, err := exec.svc.traverserGenerator.GenerateTraverser(addr) @@ -126,7 +148,7 @@ func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Trav } func (exec *execCtx) getChild(id *objectSDK.ID, rng *objectSDK.Range) (*object.Object, bool) { - w := newSimpleObjectWriter() + w := NewSimpleObjectWriter() p := exec.prm p.common = p.common.WithLocalOnly(false) @@ -139,9 +161,9 @@ func (exec *execCtx) getChild(id *objectSDK.ID, rng *objectSDK.Range) (*object.O p.WithAddress(addr) - exec.statusError = exec.svc.get(exec.context(), p) + exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng)) - return w.object(), exec.status == statusOK + return w.Object(), exec.status == statusOK } func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) { @@ -153,9 +175,14 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) { p.common = p.common.WithLocalOnly(false) p.WithAddress(childAddr) - header, err := exec.svc.headSvc.head(exec.context(), Prm{ + prm := HeadPrm{ commonPrm: p.commonPrm, - }) + } + + w := NewSimpleObjectWriter() + prm.SetHeaderWriter(w) + + err := exec.svc.Head(exec.context(), prm) switch { default: @@ -172,7 +199,7 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) { exec.status = statusOK exec.err = nil - return header, true + return w.Object(), true } } @@ -244,6 +271,10 @@ func (exec *execCtx) writeCollectedHeader() bool { } func (exec *execCtx) writeObjectPayload(obj *object.Object) bool { + if exec.headOnly() { + return true + } + err := exec.prm.objWriter.WriteChunk(obj.Payload()) switch { diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index a5de66f20..1e0ae0110 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -9,14 +9,12 @@ import ( // Get serves a request to get an object by address, and returns Streamer instance. func (s *Service) Get(ctx context.Context, prm Prm) error { - return s.get(ctx, RangePrm{ - commonPrm: prm.commonPrm, - }).err + return s.get(ctx, prm.commonPrm).err } // GetRange serves a request to get an object by address, and returns Streamer instance. func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { - return s.get(ctx, prm).err + return s.get(ctx, prm.commonPrm, withPayloadRange(prm.rng)).err } func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) { @@ -50,14 +48,28 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas }, nil } -func (s *Service) get(ctx context.Context, prm RangePrm) statusError { +// Head reads object header from container. +// +// Returns ErrNotFound if the header was not received for the call. +// Returns SplitInfoError if object is virtual and raw flag is set. +func (s *Service) Head(ctx context.Context, prm HeadPrm) error { + return s.get(ctx, prm.commonPrm, headOnly()).err +} + +func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError { exec := &execCtx{ - svc: s, - ctx: ctx, - prm: prm, + svc: s, + ctx: ctx, + prm: RangePrm{ + commonPrm: prm, + }, infoSplit: objectSDK.NewSplitInfo(), } + for i := range opts { + opts[i](exec) + } + exec.setLogger(s.log) exec.execute() diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index b6a9f0217..5475c4187 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -93,8 +93,8 @@ func newTestClient() *testClient { } } -func (c *testClient) GetObject(_ context.Context, p RangePrm) (*objectSDK.Object, error) { - v, ok := c.results[p.Address().String()] +func (c *testClient) getObject(exec *execCtx) (*objectSDK.Object, error) { + v, ok := c.results[exec.address().String()] if !ok { return nil, object.ErrNotFound } @@ -103,7 +103,7 @@ func (c *testClient) GetObject(_ context.Context, p RangePrm) (*objectSDK.Object return nil, v.err } - return cutToRange(v.obj.Object(), p.rng).SDK(), nil + return cutToRange(v.obj.Object(), exec.ctxRange()).SDK(), nil } func (c *testClient) head(_ context.Context, p Prm) (*object.Object, error) { @@ -126,11 +126,11 @@ func (c *testClient) addResult(addr *objectSDK.Address, obj *object.RawObject, e }{obj: obj, err: err} } -func (s *testStorage) Get(p RangePrm) (*object.Object, error) { +func (s *testStorage) get(exec *execCtx) (*object.Object, error) { var ( ok bool obj *object.Object - sAddr = p.Address().String() + sAddr = exec.address().String() ) if _, ok = s.inhumed[sAddr]; ok { @@ -142,7 +142,7 @@ func (s *testStorage) Get(p RangePrm) (*object.Object, error) { } if obj, ok = s.phy[sAddr]; ok { - return cutToRange(obj, p.rng), nil + return cutToRange(obj, exec.ctxRange()), nil } return nil, object.ErrNotFound @@ -207,6 +207,7 @@ func generateObject(addr *objectSDK.Address, prev *objectSDK.ID, payload []byte, obj.SetContainerID(addr.ContainerID()) obj.SetID(addr.ObjectID()) obj.SetPayload(payload) + obj.SetPayloadSize(uint64(len(payload))) obj.SetPreviousID(prev) obj.SetChildren(children...) @@ -249,11 +250,20 @@ func TestGetLocalOnly(t *testing.T) { return p } + newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm { + p := HeadPrm{} + p.SetHeaderWriter(w) + p.WithRawFlag(raw) + p.common = new(util.CommonPrm).WithLocalOnly(true) + + return p + } + t.Run("OK", func(t *testing.T) { storage := newTestStorage() svc := newSvc(storage) - w := newSimpleObjectWriter() + w := NewSimpleObjectWriter() p := newPrm(false, w) payloadSz := uint64(10) @@ -273,16 +283,24 @@ func TestGetLocalOnly(t *testing.T) { require.NoError(t, err) - require.Equal(t, obj.Object(), w.object()) + require.Equal(t, obj.Object(), w.Object()) - w = newSimpleObjectWriter() + w = NewSimpleObjectWriter() rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) - require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.object().Payload()) + require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.Object().Payload()) + + w = NewSimpleObjectWriter() + headPrm := newHeadPrm(false, w) + headPrm.WithAddress(addr) + + err = svc.Head(ctx, headPrm) + require.NoError(t, err) + require.Equal(t, obj.CutPayload().Object(), w.Object()) }) t.Run("INHUMED", func(t *testing.T) { @@ -306,6 +324,12 @@ func TestGetLocalOnly(t *testing.T) { err = svc.GetRange(ctx, rngPrm) require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) + + headPrm := newHeadPrm(false, nil) + headPrm.WithAddress(addr) + + err = svc.Head(ctx, headPrm) + require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) }) t.Run("404", func(t *testing.T) { @@ -328,6 +352,12 @@ func TestGetLocalOnly(t *testing.T) { err = svc.GetRange(ctx, rngPrm) require.True(t, errors.Is(err, object.ErrNotFound)) + + headPrm := newHeadPrm(false, nil) + headPrm.WithAddress(addr) + + err = svc.Head(ctx, headPrm) + require.True(t, errors.Is(err, object.ErrNotFound)) }) t.Run("VIRTUAL", func(t *testing.T) { @@ -361,6 +391,13 @@ func TestGetLocalOnly(t *testing.T) { err = svc.Get(ctx, p) require.True(t, errors.As(err, &errSplit)) + + headPrm := newHeadPrm(true, nil) + headPrm.WithAddress(addr) + + err = svc.Head(ctx, headPrm) + require.True(t, errors.As(err, &errSplit)) + require.Equal(t, splitInfo, errSplit.SplitInfo()) }) } @@ -440,7 +477,7 @@ func TestGetRemoteSmall(t *testing.T) { newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service { svc := &Service{cfg: new(cfg)} - svc.log = test.NewLogger(true) + svc.log = test.NewLogger(false) svc.localStorage = newTestStorage() svc.assembly = true svc.traverserGenerator = &testTraverserGenerator{ @@ -476,6 +513,15 @@ func TestGetRemoteSmall(t *testing.T) { return p } + newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm { + p := HeadPrm{} + p.SetHeaderWriter(w) + p.WithRawFlag(raw) + p.common = new(util.CommonPrm).WithLocalOnly(false) + + return p + } + t.Run("OK", func(t *testing.T) { addr := generateAddress() addr.SetContainerID(cid) @@ -507,28 +553,36 @@ func TestGetRemoteSmall(t *testing.T) { }, }) - w := newSimpleObjectWriter() + w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) - require.Equal(t, obj.Object(), w.object()) + require.Equal(t, obj.Object(), w.Object()) *c1, *c2 = *c2, *c1 err = svc.Get(ctx, p) require.NoError(t, err) - require.Equal(t, obj.Object(), w.object()) + require.Equal(t, obj.Object(), w.Object()) - w = newSimpleObjectWriter() + w = NewSimpleObjectWriter() rngPrm := newRngPrm(false, w, payloadSz/3, payloadSz/3) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) - require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.object().Payload()) + require.Equal(t, payload[payloadSz/3:2*payloadSz/3], w.Object().Payload()) + + w = NewSimpleObjectWriter() + headPrm := newHeadPrm(false, w) + headPrm.WithAddress(addr) + + err = svc.Head(ctx, headPrm) + require.NoError(t, err) + require.Equal(t, obj.CutPayload().Object(), w.Object()) }) t.Run("INHUMED", func(t *testing.T) { @@ -567,6 +621,12 @@ func TestGetRemoteSmall(t *testing.T) { err = svc.GetRange(ctx, rngPrm) require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) + + headPrm := newHeadPrm(false, nil) + headPrm.WithAddress(addr) + + err = svc.Head(ctx, headPrm) + require.True(t, errors.Is(err, object.ErrAlreadyRemoved)) }) t.Run("404", func(t *testing.T) { @@ -605,9 +665,26 @@ func TestGetRemoteSmall(t *testing.T) { err = svc.GetRange(ctx, rngPrm) require.True(t, errors.Is(err, object.ErrNotFound)) + + headPrm := newHeadPrm(false, nil) + headPrm.WithAddress(addr) + + err = svc.Head(ctx, headPrm) + require.True(t, errors.Is(err, object.ErrNotFound)) }) t.Run("VIRTUAL", func(t *testing.T) { + testHeadVirtual := func(svc *Service, addr *objectSDK.Address, i *objectSDK.SplitInfo) { + headPrm := newHeadPrm(false, nil) + headPrm.WithAddress(addr) + + errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) + + err := svc.Head(ctx, headPrm) + require.True(t, errors.As(err, &errSplit)) + require.Equal(t, i, errSplit.SplitInfo()) + } + t.Run("linking", func(t *testing.T) { t.Run("get linking failure", func(t *testing.T) { addr := generateAddress() @@ -645,6 +722,8 @@ func TestGetRemoteSmall(t *testing.T) { }, }) + testHeadVirtual(svc, addr, splitInfo) + p := newPrm(false, nil) p.WithAddress(addr) @@ -717,15 +796,15 @@ func TestGetRemoteSmall(t *testing.T) { }, }) - p := newPrm(false, newSimpleObjectWriter()) + testHeadVirtual(svc, addr, splitInfo) + + p := newPrm(false, NewSimpleObjectWriter()) p.WithAddress(addr) err := svc.Get(ctx, p) require.True(t, errors.Is(err, object.ErrNotFound)) - svc.headSvc = c2 - - rngPrm := newRngPrm(false, newSimpleObjectWriter(), 0, 1) + rngPrm := newRngPrm(false, NewSimpleObjectWriter(), 0, 1) rngPrm.WithAddress(addr) err = svc.GetRange(ctx, rngPrm) @@ -792,18 +871,18 @@ func TestGetRemoteSmall(t *testing.T) { }, }) - w := newSimpleObjectWriter() + testHeadVirtual(svc, addr, splitInfo) + + w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) - require.Equal(t, srcObj.Object(), w.object()) + require.Equal(t, srcObj.Object(), w.Object()) - svc.headSvc = c2 - - w = newSimpleObjectWriter() + w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() off := payloadSz / 3 @@ -814,7 +893,7 @@ func TestGetRemoteSmall(t *testing.T) { err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) - require.Equal(t, payload[off:off+ln], w.object().Payload()) + require.Equal(t, payload[off:off+ln], w.Object().Payload()) }) }) @@ -855,6 +934,8 @@ func TestGetRemoteSmall(t *testing.T) { }, }) + testHeadVirtual(svc, addr, splitInfo) + p := newPrm(false, nil) p.WithAddress(addr) @@ -917,11 +998,12 @@ func TestGetRemoteSmall(t *testing.T) { }, }) + testHeadVirtual(svc, addr, splitInfo) + headSvc := newTestClient() headSvc.addResult(preRightAddr, nil, object.ErrNotFound) - svc.headSvc = headSvc - p := newPrm(false, newSimpleObjectWriter()) + p := newPrm(false, NewSimpleObjectWriter()) p.WithAddress(addr) err := svc.Get(ctx, p) @@ -987,18 +1069,18 @@ func TestGetRemoteSmall(t *testing.T) { }, }) - svc.headSvc = c2 + testHeadVirtual(svc, addr, splitInfo) - w := newSimpleObjectWriter() + w := NewSimpleObjectWriter() p := newPrm(false, w) p.WithAddress(addr) err := svc.Get(ctx, p) require.NoError(t, err) - require.Equal(t, srcObj.Object(), w.object()) + require.Equal(t, srcObj.Object(), w.Object()) - w = newSimpleObjectWriter() + w = NewSimpleObjectWriter() payloadSz := srcObj.PayloadSize() off := payloadSz / 3 @@ -1009,9 +1091,9 @@ func TestGetRemoteSmall(t *testing.T) { err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) - require.Equal(t, payload[off:off+ln], w.object().Payload()) + require.Equal(t, payload[off:off+ln], w.Object().Payload()) - w = newSimpleObjectWriter() + w = NewSimpleObjectWriter() off = payloadSz - 2 ln = 1 @@ -1020,7 +1102,7 @@ func TestGetRemoteSmall(t *testing.T) { err = svc.GetRange(ctx, rngPrm) require.NoError(t, err) - require.Equal(t, payload[off:off+ln], w.object().Payload()) + require.Equal(t, payload[off:off+ln], w.Object().Payload()) }) }) }) diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 7e59f4838..bdd5c304d 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -10,7 +10,7 @@ import ( func (exec *execCtx) executeLocal() { var err error - exec.collectedObject, err = exec.svc.localStorage.Get(exec.prm) + exec.collectedObject, err = exec.svc.localStorage.get(exec) var errSplitInfo *objectSDK.SplitInfoError diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 1aa4d729f..aab3ec57a 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -31,6 +31,11 @@ type RangeHashPrm struct { rngs []*objectSDK.Range } +// HeadPrm groups parameters of Head service call. +type HeadPrm struct { + commonPrm +} + type commonPrm struct { objWriter ObjectWriter @@ -50,9 +55,15 @@ type ChunkWriter interface { WriteChunk([]byte) error } +// HeaderWriter is an interface of target component +// to write object header. +type HeaderWriter interface { + WriteHeader(*object.Object) error +} + // ObjectWriter is an interface of target component to write object. type ObjectWriter interface { - WriteHeader(*object.Object) error + HeaderWriter ChunkWriter } @@ -71,9 +82,9 @@ func (p *commonPrm) SetRemoteCallOptions(opts ...client.CallOption) { p.callOpts = opts } -// SetObjectWriter sets target component to write the object payload range. +// SetChunkWriter sets target component to write the object payload range. func (p *RangePrm) SetChunkWriter(w ChunkWriter) { - p.objWriter = &rangeWriter{ + p.objWriter = &partWriter{ chunkWriter: w, } } @@ -97,3 +108,10 @@ func (p *RangeHashPrm) SetHashGenerator(v func() hash.Hash) { func (p *commonPrm) SetCommonParameters(common *util.CommonPrm) { p.common = common } + +// SetHeaderWriter sets target component to write the object header. +func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) { + p.objWriter = &partWriter{ + headWriter: w, + } +} diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index ee73db205..b6001d0d0 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -20,7 +20,7 @@ func (exec *execCtx) processNode(ctx context.Context, addr *network.Address) boo return true } - obj, err := client.GetObject(ctx, exec.prm) + obj, err := client.getObject(exec) var errSplitInfo *objectSDK.SplitInfoError diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index c89e918c9..24a23791b 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,7 +1,6 @@ package getsvc import ( - "context" "crypto/ecdsa" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -9,7 +8,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" - headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" @@ -25,7 +23,7 @@ type Service struct { type Option func(*cfg) type getClient interface { - GetObject(context.Context, RangePrm) (*objectSDK.Object, error) + getObject(*execCtx) (*objectSDK.Object, error) } type cfg struct { @@ -33,12 +31,8 @@ type cfg struct { log *logger.Logger - headSvc interface { - head(context.Context, Prm) (*object.Object, error) - } - localStorage interface { - Get(RangePrm) (*object.Object, error) + get(*execCtx) (*object.Object, error) } clientCache interface { @@ -54,7 +48,6 @@ func defaultCfg() *cfg { return &cfg{ assembly: true, log: zap.L(), - headSvc: new(headSvcWrapper), localStorage: new(storageEngineWrapper), clientCache: new(clientCacheWrapper), } @@ -117,10 +110,3 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option { c.traverserGenerator = t } } - -// WithHeadService returns option to set the utility serving object.Head. -func WithHeadService(svc *headsvc.Service) Option { - return func(c *cfg) { - c.headSvc.(*headSvcWrapper).svc = svc - } -} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 5b026bd00..4947e4786 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -1,7 +1,6 @@ package getsvc import ( - "context" "crypto/ecdsa" "hash" @@ -10,10 +9,9 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" - headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" ) -type simpleObjectWriter struct { +type SimpleObjectWriter struct { obj *object.RawObject pld []byte @@ -33,13 +31,11 @@ type storageEngineWrapper struct { engine *engine.StorageEngine } -type headSvcWrapper struct { - svc *headsvc.Service -} - -type rangeWriter struct { +type partWriter struct { ObjectWriter + headWriter HeaderWriter + chunkWriter ChunkWriter } @@ -47,13 +43,13 @@ type hasherWrapper struct { hash hash.Hash } -func newSimpleObjectWriter() *simpleObjectWriter { - return &simpleObjectWriter{ +func NewSimpleObjectWriter() *SimpleObjectWriter { + return &SimpleObjectWriter{ obj: object.NewRaw(), } } -func (s *simpleObjectWriter) WriteHeader(obj *object.Object) error { +func (s *SimpleObjectWriter) WriteHeader(obj *object.Object) error { s.obj = object.NewRawFromObject(obj) s.pld = make([]byte, 0, obj.PayloadSize()) @@ -61,16 +57,12 @@ func (s *simpleObjectWriter) WriteHeader(obj *object.Object) error { return nil } -func (s *simpleObjectWriter) WriteChunk(p []byte) error { +func (s *SimpleObjectWriter) WriteChunk(p []byte) error { s.pld = append(s.pld, p...) return nil } -func (s *simpleObjectWriter) Close() error { - return nil -} - -func (s *simpleObjectWriter) object() *object.Object { +func (s *SimpleObjectWriter) Object() *object.Object { if len(s.pld) > 0 { s.obj.SetPayload(s.pld) } @@ -86,16 +78,24 @@ func (c *clientCacheWrapper) get(key *ecdsa.PrivateKey, addr string) (getClient, }, err } -func (c *clientWrapper) GetObject(ctx context.Context, p RangePrm) (*objectSDK.Object, error) { +func (c *clientWrapper) getObject(exec *execCtx) (*objectSDK.Object, error) { + if exec.headOnly() { + return c.client.GetObjectHeader(exec.context(), + new(client.ObjectHeaderParams). + WithAddress(exec.address()). + WithRawFlag(exec.isRaw()), + exec.callOptions()..., + ) + } // we don't specify payload writer because we accumulate // the object locally (even huge). - if p.rng != nil { - data, err := c.client.ObjectPayloadRangeData(ctx, + if rng := exec.ctxRange(); rng != nil { + data, err := c.client.ObjectPayloadRangeData(exec.context(), new(client.RangeDataParams). - WithAddress(p.Address()). - WithRange(p.rng). - WithRaw(p.RawFlag()), - p.callOpts..., + WithAddress(exec.address()). + WithRange(rng). + WithRaw(exec.isRaw()), + exec.callOptions()..., ) if err != nil { return nil, err @@ -103,22 +103,30 @@ func (c *clientWrapper) GetObject(ctx context.Context, p RangePrm) (*objectSDK.O return payloadOnlyObject(data), nil } else { - // we don't specify payload writer because we accumulate - // the object locally (even huge). - return c.client.GetObject(ctx, + return c.client.GetObject(exec.context(), new(client.GetObjectParams). - WithAddress(p.Address()). - WithRawFlag(p.RawFlag()), - p.callOpts..., + WithAddress(exec.address()). + WithRawFlag(exec.isRaw()), + exec.callOptions()..., ) } } -func (e *storageEngineWrapper) Get(p RangePrm) (*object.Object, error) { - if p.rng != nil { +func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { + if exec.headOnly() { + r, err := e.engine.Head(new(engine.HeadPrm). + WithAddress(exec.address()). + WithRaw(exec.isRaw()), + ) + if err != nil { + return nil, err + } + + return r.Header(), nil + } else if rng := exec.ctxRange(); rng != nil { r, err := e.engine.GetRange(new(engine.RngPrm). - WithAddress(p.Address()). - WithPayloadRange(p.rng), + WithAddress(exec.address()). + WithPayloadRange(rng), ) if err != nil { return nil, err @@ -127,7 +135,7 @@ func (e *storageEngineWrapper) Get(p RangePrm) (*object.Object, error) { return r.Object(), nil } else { r, err := e.engine.Get(new(engine.GetPrm). - WithAddress(p.Address()), + WithAddress(exec.address()), ) if err != nil { return nil, err @@ -137,22 +145,12 @@ func (e *storageEngineWrapper) Get(p RangePrm) (*object.Object, error) { } } -func (s *headSvcWrapper) head(ctx context.Context, p Prm) (*object.Object, error) { - r, err := s.svc.Head(ctx, new(headsvc.Prm). - WithAddress(p.Address()). - WithCommonPrm(p.common). - Short(false), - ) - - if err != nil { - return nil, err - } - - return r.Header(), nil +func (w *partWriter) WriteChunk(p []byte) error { + return w.chunkWriter.WriteChunk(p) } -func (w *rangeWriter) WriteChunk(p []byte) error { - return w.chunkWriter.WriteChunk(p) +func (w *partWriter) WriteHeader(o *object.Object) error { + return w.headWriter.WriteHeader(o) } func payloadOnlyObject(payload []byte) *objectSDK.Object { diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index 27f90ab4f..f2e408534 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -91,6 +91,28 @@ func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRe return toHashResponse(req.GetBody().GetType(), res), nil } +// Head serves NeoFS API v2 compatible HEAD requests. +func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { + resp := new(objectV2.HeadResponse) + resp.SetBody(new(objectV2.HeadResponseBody)) + + p, err := s.toHeadPrm(req, resp) + if err != nil { + return nil, err + } + + err = s.svc.Head(ctx, *p) + + var splitErr *object.SplitInfoError + + if errors.As(err, &splitErr) { + setSplitInfoHeadResponse(splitErr.SplitInfo(), resp) + err = nil + } + + return resp, err +} + func WithInternalService(v *getsvc.Service) Option { return func(c *cfg) { c.svc = v diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index a0b243d19..bec5c82d5 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -6,11 +6,12 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/token" objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-node/pkg/core/object" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -30,7 +31,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre p.SetPrivateKey(key) body := req.GetBody() - p.WithAddress(object.NewAddressFromV2(body.GetAddress())) + p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress())) p.WithRawFlag(body.GetRaw()) p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) p.SetObjectWriter(&streamObjectWriter{stream}) @@ -53,11 +54,11 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get p.SetPrivateKey(key) body := req.GetBody() - p.WithAddress(object.NewAddressFromV2(body.GetAddress())) + p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress())) p.WithRawFlag(body.GetRaw()) p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) p.SetChunkWriter(&streamObjectRangeWriter{stream}) - p.SetRange(object.NewRangeFromV2(body.GetRange())) + p.SetRange(objectSDK.NewRangeFromV2(body.GetRange())) p.SetCommonParameters(commonParameters(meta)) return p, nil @@ -75,15 +76,15 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran p.SetPrivateKey(key) body := req.GetBody() - p.WithAddress(object.NewAddressFromV2(body.GetAddress())) + p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress())) p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) p.SetCommonParameters(commonParameters(meta)) rngsV2 := body.GetRanges() - rngs := make([]*object.Range, 0, len(rngsV2)) + rngs := make([]*objectSDK.Range, 0, len(rngsV2)) for i := range rngsV2 { - rngs = append(rngs, object.NewRangeFromV2(rngsV2[i])) + rngs = append(rngs, objectSDK.NewRangeFromV2(rngsV2[i])) } p.SetRangeList(rngs) @@ -104,6 +105,46 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran return p, nil } +type headResponseWriter struct { + mainOnly bool + + body *objectV2.HeadResponseBody +} + +func (w *headResponseWriter) WriteHeader(hdr *object.Object) error { + if w.mainOnly { + w.body.SetHeaderPart(toShortObjectHeader(hdr)) + } else { + w.body.SetHeaderPart(toFullObjectHeader(hdr)) + } + + return nil +} + +func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { + meta := req.GetMetaHeader() + + key, err := s.keyStorage.GetKey(token.NewSessionTokenFromV2(meta.GetSessionToken())) + if err != nil { + return nil, err + } + + p := new(getsvc.HeadPrm) + p.SetPrivateKey(key) + + body := req.GetBody() + p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress())) + p.WithRawFlag(body.GetRaw()) + p.SetRemoteCallOptions(remoteCallOptionsFromMeta(meta)...) + p.SetHeaderWriter(&headResponseWriter{ + mainOnly: body.GetMainOnly(), + body: resp.GetBody(), + }) + p.SetCommonParameters(commonParameters(meta)) + + return p, nil +} + // can be shared accross all services func remoteCallOptionsFromMeta(meta *session.RequestMetaHeader) []client.CallOption { xHdrs := meta.GetXHeaders() @@ -128,7 +169,7 @@ func commonParameters(meta *session.RequestMetaHeader) *util.CommonPrm { WithLocalOnly(meta.GetTTL() <= 1) } -func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse { +func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse { resp := new(objectV2.GetResponse) body := new(objectV2.GetResponseBody) @@ -139,7 +180,7 @@ func splitInfoResponse(info *object.SplitInfo) *objectV2.GetResponse { return resp } -func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse { +func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse { resp := new(objectV2.GetRangeResponse) body := new(objectV2.GetRangeResponseBody) @@ -150,6 +191,10 @@ func splitInfoRangeResponse(info *object.SplitInfo) *objectV2.GetRangeResponse { return resp } +func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResponse) { + resp.GetBody().SetHeaderPart(info.ToV2()) +} + func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse { resp := new(objectV2.GetRangeHashResponse) @@ -161,3 +206,26 @@ func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.G return resp } + +func toFullObjectHeader(hdr *object.Object) objectV2.GetHeaderPart { + obj := hdr.ToV2() + + hs := new(objectV2.HeaderWithSignature) + hs.SetHeader(obj.GetHeader()) + hs.SetSignature(obj.GetSignature()) + + return hs +} + +func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart { + hdrV2 := hdr.ToV2().GetHeader() + + sh := new(objectV2.ShortHeader) + sh.SetOwnerID(hdrV2.GetOwnerID()) + sh.SetCreationEpoch(hdrV2.GetCreationEpoch()) + sh.SetPayloadLength(hdrV2.GetPayloadLength()) + sh.SetVersion(hdrV2.GetVersion()) + sh.SetObjectType(hdrV2.GetObjectType()) + + return sh +}