From 1e170c38128c6a541795d03a5b5638b7563a2e12 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 12 Jan 2021 17:55:02 +0300 Subject: [PATCH] [#234] services/object: Support netmap epoch and lookup dead in read ops Support processing of NetmapEpoch and NetmapLookupDepth X-headers when processing object read operations. Placement for operations Get/Head/GetRange/GetRangeHash/Search is built for the epoch specified in NetmapEpoch X-header (by default latest). Also the specified operations are processed until success is achieved for network maps from the past up to NetmapLookupDepth value. Behavior for default values (zero or missing) left unchanged. Signed-off-by: Leonard Lyubich --- cmd/neofs-cli/modules/object.go | 2 +- cmd/neofs-cli/modules/storagegroup.go | 2 +- cmd/neofs-node/object.go | 2 + go.mod | 2 +- go.sum | Bin 58914 -> 58974 bytes pkg/services/object/delete/v2/util.go | 7 +- pkg/services/object/get/container.go | 45 +++++- pkg/services/object/get/exec.go | 41 +++++- pkg/services/object/get/get_test.go | 144 +++++++++++++++++- pkg/services/object/get/service.go | 17 ++- pkg/services/object/get/util.go | 9 ++ pkg/services/object/get/v2/util.go | 28 +++- pkg/services/object/put/v2/streamer.go | 7 +- pkg/services/object/put/v2/util.go | 9 +- pkg/services/object/search/container.go | 46 +++++- pkg/services/object/search/exec.go | 41 +++++- pkg/services/object/search/search_test.go | 169 +++++++++++++++++----- pkg/services/object/search/service.go | 17 ++- pkg/services/object/search/util.go | 13 +- pkg/services/object/search/v2/util.go | 7 +- pkg/services/object/util/placement.go | 11 +- pkg/services/object/util/prm.go | 84 +++++++++-- 22 files changed, 613 insertions(+), 90 deletions(-) diff --git a/cmd/neofs-cli/modules/object.go b/cmd/neofs-cli/modules/object.go index 1ef48855..8df0848d 100644 --- a/cmd/neofs-cli/modules/object.go +++ b/cmd/neofs-cli/modules/object.go @@ -264,7 +264,7 @@ func deleteObject(cmd *cobra.Command, _ []string) error { return err } - tombstoneAddr, err := client.DeleteObject(cli, ctx, + tombstoneAddr, err := client.DeleteObject(ctx, cli, new(client.DeleteObjectParams).WithAddress(objAddr), append(globalCallOptions(), client.WithSession(tok), diff --git a/cmd/neofs-cli/modules/storagegroup.go b/cmd/neofs-cli/modules/storagegroup.go index 1626e37c..cd76ed9a 100644 --- a/cmd/neofs-cli/modules/storagegroup.go +++ b/cmd/neofs-cli/modules/storagegroup.go @@ -289,7 +289,7 @@ func delSG(cmd *cobra.Command, _ []string) error { addr.SetContainerID(cid) addr.SetObjectID(id) - tombstone, err := client.DeleteObject(cli, ctx, + tombstone, err := client.DeleteObject(ctx, cli, new(client.DeleteObjectParams). WithAddress(addr), client.WithSession(tok)) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 223da2e1..715cc83a 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -242,6 +242,7 @@ func initObjectService(c *cfg) { placement.WithoutSuccessTracking(), ), ), + searchsvc.WithNetMapSource(c.cfgNetmap.wrapper), ) sSearchV2 := searchsvcV2.NewService( @@ -261,6 +262,7 @@ func initObjectService(c *cfg) { placement.SuccessAfter(1), ), ), + getsvc.WithNetMapSource(c.cfgNetmap.wrapper), ) sGetV2 := getsvcV2.NewService( diff --git a/go.mod b/go.mod index ba472fe7..f0501e41 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.92.0 - github.com/nspcc-dev/neofs-api-go v1.22.0 + github.com/nspcc-dev/neofs-api-go v1.22.1-0.20210112152207-43c579f6704d github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index d2f3b3cbbc483ae7d9c8a9009f5a0ddebd1b74d0..29b9824fb7f80df6398dd6dcd2a7894796412aa8 100644 GIT binary patch delta 133 zcmZ2 0 { + return true + } + + e, err := exec.svc.currentEpochReceiver.currentEpoch() + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not get current epoch number", + zap.String("error", err.Error()), + ) + + return false + case err == nil: + exec.curProcEpoch = e + return true + } +} + func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) { - t, err := exec.svc.traverserGenerator.GenerateTraverser(addr) + t, err := exec.svc.traverserGenerator.GenerateTraverser(addr, exec.curProcEpoch) switch { default: diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 63bea462..f5df9cdf 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -31,7 +31,7 @@ type testStorage struct { type testTraverserGenerator struct { c *container.Container - b placement.Builder + b map[uint64]placement.Builder } type testPlacementBuilder struct { @@ -49,6 +49,12 @@ type testClient struct { } } +type testEpochReceiver uint64 + +func (e testEpochReceiver) currentEpoch() (uint64, error) { + return uint64(e), nil +} + func newTestStorage() *testStorage { return &testStorage{ inhumed: make(map[string]struct{}), @@ -57,11 +63,11 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(addr *objectSDK.Address) (*placement.Traverser, error) { +func (g *testTraverserGenerator) GenerateTraverser(addr *objectSDK.Address, e uint64) (*placement.Traverser, error) { return placement.NewTraverser( placement.ForContainer(g.c), placement.ForObject(addr.ObjectID()), - placement.UseBuilder(g.b), + placement.UseBuilder(g.b[e]), placement.SuccessAfter(1), ) } @@ -467,11 +473,17 @@ func TestGetRemoteSmall(t *testing.T) { svc.log = test.NewLogger(false) svc.localStorage = newTestStorage() svc.assembly = true + + const curEpoch = 13 + svc.traverserGenerator = &testTraverserGenerator{ c: cnr, - b: b, + b: map[uint64]placement.Builder{ + curEpoch: b, + }, } svc.clientCache = c + svc.currentEpochReceiver = testEpochReceiver(curEpoch) return svc } @@ -1095,3 +1107,127 @@ func TestGetRemoteSmall(t *testing.T) { }) }) } + +func TestGetFromPastEpoch(t *testing.T) { + ctx := context.Background() + + cnr := container.New(container.WithPolicy(new(netmap.PlacementPolicy))) + cid := container.CalculateID(cnr) + + addr := generateAddress() + addr.SetContainerID(cid) + + payloadSz := uint64(10) + payload := make([]byte, payloadSz) + _, _ = rand.Read(payload) + + obj := generateObject(addr, nil, payload) + + ns, as := testNodeMatrix(t, []int{2, 2}) + + c11 := newTestClient() + c11.addResult(addr, nil, errors.New("any error")) + + c12 := newTestClient() + c12.addResult(addr, nil, errors.New("any error")) + + c21 := newTestClient() + c21.addResult(addr, nil, errors.New("any error")) + + c22 := newTestClient() + c22.addResult(addr, obj, nil) + + svc := &Service{cfg: new(cfg)} + svc.log = test.NewLogger(false) + svc.localStorage = newTestStorage() + svc.assembly = true + + const curEpoch = 13 + + svc.traverserGenerator = &testTraverserGenerator{ + c: cnr, + b: map[uint64]placement.Builder{ + curEpoch: &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns[:1], + }, + }, + curEpoch - 1: &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns[1:], + }, + }, + }, + } + + svc.clientCache = &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c11, + as[0][1]: c12, + as[1][0]: c21, + as[1][1]: c22, + }, + } + + svc.currentEpochReceiver = testEpochReceiver(curEpoch) + + w := NewSimpleObjectWriter() + + commonPrm := new(util.CommonPrm) + + p := Prm{} + p.SetObjectWriter(w) + p.SetCommonParameters(commonPrm) + p.WithAddress(addr) + + err := svc.Get(ctx, p) + require.True(t, errors.Is(err, object.ErrNotFound)) + + commonPrm.SetNetmapLookupDepth(1) + + err = svc.Get(ctx, p) + require.NoError(t, err) + require.Equal(t, obj.Object(), w.Object()) + + rp := RangePrm{} + rp.SetChunkWriter(w) + commonPrm.SetNetmapLookupDepth(0) + rp.SetCommonParameters(commonPrm) + rp.WithAddress(addr) + + off, ln := payloadSz/3, payloadSz/3 + + r := objectSDK.NewRange() + r.SetOffset(off) + r.SetLength(ln) + + rp.SetRange(r) + + err = svc.GetRange(ctx, rp) + require.True(t, errors.Is(err, object.ErrNotFound)) + + w = NewSimpleObjectWriter() + rp.SetChunkWriter(w) + commonPrm.SetNetmapLookupDepth(1) + + err = svc.GetRange(ctx, rp) + require.NoError(t, err) + require.Equal(t, payload[off:off+ln], w.Object().Payload()) + + hp := HeadPrm{} + hp.SetHeaderWriter(w) + commonPrm.SetNetmapLookupDepth(0) + hp.SetCommonParameters(commonPrm) + hp.WithAddress(addr) + + err = svc.Head(ctx, hp) + require.True(t, errors.Is(err, object.ErrNotFound)) + + w = NewSimpleObjectWriter() + hp.SetHeaderWriter(w) + commonPrm.SetNetmapLookupDepth(1) + + err = svc.Head(ctx, hp) + require.NoError(t, err) + require.Equal(t, obj.CutPayload().Object(), w.Object()) +} diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index 24a23791..e0263985 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" @@ -40,7 +41,11 @@ type cfg struct { } traverserGenerator interface { - GenerateTraverser(*objectSDK.Address) (*placement.Traverser, error) + GenerateTraverser(*objectSDK.Address, uint64) (*placement.Traverser, error) + } + + currentEpochReceiver interface { + currentEpoch() (uint64, error) } } @@ -110,3 +115,13 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option { c.traverserGenerator = t } } + +// WithNetMapSource returns option to set network +// map storage to receive current network state. +func WithNetMapSource(nmSrc netmap.Source) Option { + return func(c *cfg) { + c.currentEpochReceiver = &nmSrcWrapper{ + nmSrc: nmSrc, + } + } +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 26ea4f4f..a3354980 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" @@ -43,6 +44,10 @@ type hasherWrapper struct { hash io.Writer } +type nmSrcWrapper struct { + nmSrc netmap.Source +} + func NewSimpleObjectWriter() *SimpleObjectWriter { return &SimpleObjectWriter{ obj: object.NewRaw(), @@ -162,3 +167,7 @@ func (h *hasherWrapper) WriteChunk(p []byte) error { _, err := h.hash.Write(p) return err } + +func (n *nmSrcWrapper) currentEpoch() (uint64, error) { + return n.nmSrc.Epoch() +} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index a40a3bf5..92fbd2fc 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -24,8 +24,13 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(getsvc.Prm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) @@ -45,8 +50,13 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(getsvc.RangePrm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) @@ -67,8 +77,13 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(getsvc.RangeHashPrm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) @@ -125,8 +140,13 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(getsvc.HeadPrm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index 959c8482..5d980fce 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -13,7 +13,12 @@ type streamer struct { func (s *streamer) Send(req *object.PutRequest) (err error) { switch v := req.GetBody().GetObjectPart().(type) { case *object.PutObjectPartInit: - if err = s.stream.Init(toInitPrm(v, req)); err != nil { + initPrm, err := toInitPrm(v, req) + if err != nil { + return err + } + + if err = s.stream.Init(initPrm); err != nil { err = errors.Wrapf(err, "(%T) could not init object put stream", s) } case *object.PutObjectPartChunk: diff --git a/pkg/services/object/put/v2/util.go b/pkg/services/object/put/v2/util.go index e4b7c9cb..b4235325 100644 --- a/pkg/services/object/put/v2/util.go +++ b/pkg/services/object/put/v2/util.go @@ -7,17 +7,22 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) -func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) *putsvc.PutInitPrm { +func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) { oV2 := new(objectV2.Object) oV2.SetObjectID(part.GetObjectID()) oV2.SetSignature(part.GetSignature()) oV2.SetHeader(part.GetHeader()) + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + return new(putsvc.PutInitPrm). WithObject( object.NewRawFromV2(oV2), ). - WithCommonPrm(util.CommonPrmFromV2(req)) + WithCommonPrm(commonPrm), nil } func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm { diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index b7f0eb80..25e3a4d2 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -12,17 +12,51 @@ func (exec *execCtx) executeOnContainer() { return } - exec.log.Debug("trying to execute in container...") + lookupDepth := exec.netmapLookupDepth() + + exec.log.Debug("trying to execute in container...", + zap.Uint64("netmap lookup depth", lookupDepth), + ) + + // initialize epoch number + ok := exec.initEpoch() + if !ok { + return + } + + for { + if exec.processCurrentEpoch() { + break + } + + // check the maximum depth has been reached + if lookupDepth == 0 { + break + } + + lookupDepth-- + + // go to the previous epoch + exec.curProcEpoch-- + } + + exec.status = statusOK + exec.err = nil +} + +func (exec *execCtx) processCurrentEpoch() bool { + exec.log.Debug("process epoch", + zap.Uint64("number", exec.curProcEpoch), + ) traverser, ok := exec.generateTraverser(exec.containerID()) if !ok { - return + return true } ctx, cancel := context.WithCancel(exec.context()) defer cancel() -loop: for { addrs := traverser.Next() if len(addrs) == 0 { @@ -36,7 +70,8 @@ loop: exec.log.Debug("interrupt placement iteration by context", zap.String("error", ctx.Err().Error()), ) - break loop + + return true default: } @@ -45,6 +80,5 @@ loop: } } - exec.status = statusOK - exec.err = nil + return false } diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index c6306f7f..3c7194bb 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" @@ -28,6 +29,8 @@ type execCtx struct { statusError log *logger.Logger + + curProcEpoch uint64 } const ( @@ -64,7 +67,9 @@ func (exec execCtx) key() *ecdsa.PrivateKey { } func (exec execCtx) callOptions() []client.CallOption { - return exec.prm.common.RemoteCallOptions() + return exec.prm.common.RemoteCallOptions( + util.WithNetmapEpoch(exec.curProcEpoch), + ) } func (exec execCtx) remotePrm() *client.SearchObjectParams { @@ -79,8 +84,40 @@ func (exec *execCtx) searchFilters() objectSDK.SearchFilters { return exec.prm.SearchFilters() } +func (exec *execCtx) netmapEpoch() uint64 { + return exec.prm.common.NetmapEpoch() +} + +func (exec *execCtx) netmapLookupDepth() uint64 { + return exec.prm.common.NetmapLookupDepth() +} + +func (exec *execCtx) initEpoch() bool { + exec.curProcEpoch = exec.netmapEpoch() + if exec.curProcEpoch > 0 { + return true + } + + e, err := exec.svc.currentEpochReceiver.currentEpoch() + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not get current epoch number", + zap.String("error", err.Error()), + ) + + return false + case err == nil: + exec.curProcEpoch = e + return true + } +} + func (exec *execCtx) generateTraverser(cid *container.ID) (*placement.Traverser, bool) { - t, err := exec.svc.traverserGenerator.generateTraverser(cid) + t, err := exec.svc.traverserGenerator.generateTraverser(cid, exec.curProcEpoch) switch { default: diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 20e2c128..3b7a7969 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -31,7 +31,7 @@ type testStorage struct { type testTraverserGenerator struct { c *container.Container - b placement.Builder + b map[uint64]placement.Builder } type testPlacementBuilder struct { @@ -46,6 +46,12 @@ type simpleIDWriter struct { ids []*objectSDK.ID } +type testEpochReceiver uint64 + +func (e testEpochReceiver) currentEpoch() (uint64, error) { + return uint64(e), nil +} + func (s *simpleIDWriter) WriteIDs(ids []*objectSDK.ID) error { s.ids = append(s.ids, ids...) return nil @@ -57,10 +63,10 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) generateTraverser(_ *container.ID) (*placement.Traverser, error) { +func (g *testTraverserGenerator) generateTraverser(_ *container.ID, epoch uint64) (*placement.Traverser, error) { return placement.NewTraverser( placement.ForContainer(g.c), - placement.UseBuilder(g.b), + placement.UseBuilder(g.b[epoch]), placement.WithoutSuccessTracking(), ) } @@ -71,7 +77,10 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return nil, errors.New("vectors for address not found") } - return vs, nil + res := make([]netmap.Nodes, len(vs)) + copy(res, vs) + + return res, nil } func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (searchClient, error) { @@ -217,41 +226,6 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) { return mNodes, mAddr } -// -// func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK.ID, []byte) { -// curID := generateID() -// var prevID *objectSDK.ID -// -// addr := objectSDK.NewAddress() -// addr.SetContainerID(cid) -// -// res := make([]*object.RawObject, 0, ln) -// ids := make([]*objectSDK.ID, 0, ln) -// payload := make([]byte, 0, ln*10) -// -// for i := 0; i < ln; i++ { -// ids = append(ids, curID) -// addr.SetObjectID(curID) -// -// payloadPart := make([]byte, 10) -// rand.Read(payloadPart) -// -// o := generateObject(addr, prevID, []byte{byte(i)}) -// o.SetPayload(payloadPart) -// o.SetPayloadSize(uint64(len(payloadPart))) -// o.SetID(curID) -// -// payload = append(payload, payloadPart...) -// -// res = append(res, o) -// -// prevID = curID -// curID = generateID() -// } -// -// return res, ids, payload -// } - func TestGetRemoteSmall(t *testing.T) { ctx := context.Background() @@ -276,11 +250,16 @@ func TestGetRemoteSmall(t *testing.T) { svc.log = test.NewLogger(false) svc.localStorage = newTestStorage() + const curEpoch = 13 + svc.traverserGenerator = &testTraverserGenerator{ c: cnr, - b: b, + b: map[uint64]placement.Builder{ + curEpoch: b, + }, } svc.clientCache = c + svc.currentEpochReceiver = testEpochReceiver(curEpoch) return svc } @@ -334,3 +313,113 @@ func TestGetRemoteSmall(t *testing.T) { } }) } + +func TestGetFromPastEpoch(t *testing.T) { + ctx := context.Background() + + placementDim := []int{2, 2} + + rs := make([]*netmap.Replica, 0, len(placementDim)) + + for i := range placementDim { + r := netmap.NewReplica() + r.SetCount(uint32(placementDim[i])) + + rs = append(rs, r) + } + + pp := netmap.NewPlacementPolicy() + pp.SetReplicas(rs...) + + cnr := container.New(container.WithPolicy(pp)) + cid := container.CalculateID(cnr) + + addr := objectSDK.NewAddress() + addr.SetContainerID(cid) + + ns, as := testNodeMatrix(t, placementDim) + + c11 := newTestStorage() + ids11 := generateIDs(10) + c11.addResult(cid, ids11, nil) + + c12 := newTestStorage() + ids12 := generateIDs(10) + c12.addResult(cid, ids12, nil) + + c21 := newTestStorage() + ids21 := generateIDs(10) + c21.addResult(cid, ids21, nil) + + c22 := newTestStorage() + ids22 := generateIDs(10) + c22.addResult(cid, ids22, nil) + + svc := &Service{cfg: new(cfg)} + svc.log = test.NewLogger(false) + svc.localStorage = newTestStorage() + + const curEpoch = 13 + + svc.traverserGenerator = &testTraverserGenerator{ + c: cnr, + b: map[uint64]placement.Builder{ + curEpoch: &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns[:1], + }, + }, + curEpoch - 1: &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns[1:], + }, + }, + }, + } + + svc.clientCache = &testClientCache{ + clients: map[string]*testStorage{ + as[0][0]: c11, + as[0][1]: c12, + as[1][0]: c21, + as[1][1]: c22, + }, + } + + svc.currentEpochReceiver = testEpochReceiver(curEpoch) + + w := new(simpleIDWriter) + + p := Prm{} + p.WithContainerID(cid) + p.SetWriter(w) + + commonPrm := new(util.CommonPrm) + p.SetCommonParameters(commonPrm) + + assertContains := func(idsList ...[]*objectSDK.ID) { + var sz int + + for _, ids := range idsList { + sz += len(ids) + + for _, id := range ids { + require.Contains(t, w.ids, id) + } + } + + require.Len(t, w.ids, sz) + } + + err := svc.Search(ctx, p) + require.NoError(t, err) + assertContains(ids11, ids12) + + commonPrm.SetNetmapLookupDepth(1) + w = new(simpleIDWriter) + p.SetWriter(w) + + err = svc.Search(ctx, p) + require.NoError(t, err) + assertContains(ids11, ids12, ids21, ids22) +} diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index f90311a7..ccaa4daf 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -39,7 +40,11 @@ type cfg struct { } traverserGenerator interface { - generateTraverser(*container.ID) (*placement.Traverser, error) + generateTraverser(*container.ID, uint64) (*placement.Traverser, error) + } + + currentEpochReceiver interface { + currentEpoch() (uint64, error) } } @@ -100,3 +105,13 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option { c.traverserGenerator = (*traverseGeneratorWrapper)(t) } } + +// WithNetMapSource returns option to set network +// map storage to receive current network state. +func WithNetMapSource(nmSrc netmap.Source) Option { + return func(c *cfg) { + c.currentEpochReceiver = &nmSrcWrapper{ + nmSrc: nmSrc, + } + } +} diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index 36c8e8fa..a59cfebf 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -35,6 +36,10 @@ type storageEngineWrapper engine.StorageEngine type traverseGeneratorWrapper util.TraverserGenerator +type nmSrcWrapper struct { + nmSrc netmap.Source +} + func newUniqueAddressWriter(w IDListWriter) IDListWriter { return &uniqueIDWriter{ written: make(map[string]struct{}), @@ -102,9 +107,13 @@ func idsFromAddresses(addrs []*objectSDK.Address) []*objectSDK.ID { return ids } -func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID) (*placement.Traverser, error) { +func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID, epoch uint64) (*placement.Traverser, error) { a := objectSDK.NewAddress() a.SetContainerID(cid) - return (*util.TraverserGenerator)(e).GenerateTraverser(a) + return (*util.TraverserGenerator)(e).GenerateTraverser(a, epoch) +} + +func (n *nmSrcWrapper) currentEpoch() (uint64, error) { + return n.nmSrc.Epoch() } diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index 38193879..6e59921f 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -18,8 +18,13 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(searchsvc.Prm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) diff --git a/pkg/services/object/util/placement.go b/pkg/services/object/util/placement.go index 33be0872..393c88b0 100644 --- a/pkg/services/object/util/placement.go +++ b/pkg/services/object/util/placement.go @@ -116,12 +116,13 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav } } -// GenerateTraverser generates placement Traverser for provided object address. -func (g *TraverserGenerator) GenerateTraverser(addr *object.Address) (*placement.Traverser, error) { - // get latest network map - nm, err := netmap.GetLatestNetworkMap(g.netMapSrc) +// GenerateTraverser generates placement Traverser for provided object address +// using epoch-th network map. +func (g *TraverserGenerator) GenerateTraverser(addr *object.Address, epoch uint64) (*placement.Traverser, error) { + // get network map by epoch + nm, err := g.netMapSrc.GetNetMapByEpoch(epoch) if err != nil { - return nil, errors.Wrapf(err, "could not get latest network map") + return nil, errors.Wrapf(err, "could not get network map #%d", epoch) } // get container related container diff --git a/pkg/services/object/util/prm.go b/pkg/services/object/util/prm.go index a34f3e97..f8b4eb71 100644 --- a/pkg/services/object/util/prm.go +++ b/pkg/services/object/util/prm.go @@ -2,6 +2,7 @@ package util import ( "crypto/ecdsa" + "strconv" "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -12,6 +13,8 @@ import ( type CommonPrm struct { local bool + netmapEpoch, netmapLookupDepth uint64 + token *token.SessionToken bearer *token.BearerToken @@ -21,6 +24,12 @@ type CommonPrm struct { callOpts []client.CallOption } +type remoteCallOpts struct { + opts []client.CallOption +} + +type DynamicCallOption func(*remoteCallOpts) + func (p *CommonPrm) WithLocalOnly(v bool) *CommonPrm { if p != nil { p.local = v @@ -81,14 +90,32 @@ func (p *CommonPrm) WithRemoteCallOptions(opts ...client.CallOption) *CommonPrm } // RemoteCallOptions return call options for remote client calls. -func (p *CommonPrm) RemoteCallOptions() []client.CallOption { +func (p *CommonPrm) RemoteCallOptions(dynamic ...DynamicCallOption) []client.CallOption { if p != nil { - return p.callOpts + o := &remoteCallOpts{ + opts: p.callOpts, + } + + for _, applier := range dynamic { + applier(o) + } + + return o.opts } return nil } +func WithNetmapEpoch(v uint64) DynamicCallOption { + return func(o *remoteCallOpts) { + xHdr := pkg.NewXHeader() + xHdr.SetKey(session.XHeaderNetmapEpoch) + xHdr.SetValue(strconv.FormatUint(v, 10)) + + o.opts = append(o.opts, client.WithXHeader(xHdr)) + } +} + func (p *CommonPrm) SessionToken() *token.SessionToken { if p != nil { return p.token @@ -105,9 +132,31 @@ func (p *CommonPrm) BearerToken() *token.BearerToken { return nil } +func (p *CommonPrm) NetmapEpoch() uint64 { + if p != nil { + return p.netmapEpoch + } + + return 0 +} + +func (p *CommonPrm) NetmapLookupDepth() uint64 { + if p != nil { + return p.netmapLookupDepth + } + + return 0 +} + +func (p *CommonPrm) SetNetmapLookupDepth(v uint64) { + if p != nil { + p.netmapLookupDepth = v + } +} + func CommonPrmFromV2(req interface { GetMetaHeader() *session.RequestMetaHeader -}) *CommonPrm { +}) (*CommonPrm, error) { meta := req.GetMetaHeader() xHdrs := meta.GetXHeaders() @@ -134,12 +183,29 @@ func CommonPrmFromV2(req interface { } for i := range xHdrs { - prm.callOpts = append(prm.callOpts, - client.WithXHeader( - pkg.NewXHeaderFromV2(xHdrs[i]), - ), - ) + switch xHdrs[i].GetKey() { + case session.XHeaderNetmapEpoch: + var err error + + prm.netmapEpoch, err = strconv.ParseUint(xHdrs[i].GetValue(), 10, 64) + if err != nil { + return nil, err + } + case session.XHeaderNetmapLookupDepth: + var err error + + prm.netmapLookupDepth, err = strconv.ParseUint(xHdrs[i].GetValue(), 10, 64) + if err != nil { + return nil, err + } + default: + prm.callOpts = append(prm.callOpts, + client.WithXHeader( + pkg.NewXHeaderFromV2(xHdrs[i]), + ), + ) + } } - return prm + return prm, nil }