diff --git a/cmd/neofs-cli/modules/object.go b/cmd/neofs-cli/modules/object.go index 1ef48855..8df0848d 100644 --- a/cmd/neofs-cli/modules/object.go +++ b/cmd/neofs-cli/modules/object.go @@ -264,7 +264,7 @@ func deleteObject(cmd *cobra.Command, _ []string) error { return err } - tombstoneAddr, err := client.DeleteObject(cli, ctx, + tombstoneAddr, err := client.DeleteObject(ctx, cli, new(client.DeleteObjectParams).WithAddress(objAddr), append(globalCallOptions(), client.WithSession(tok), diff --git a/cmd/neofs-cli/modules/storagegroup.go b/cmd/neofs-cli/modules/storagegroup.go index 1626e37c..cd76ed9a 100644 --- a/cmd/neofs-cli/modules/storagegroup.go +++ b/cmd/neofs-cli/modules/storagegroup.go @@ -289,7 +289,7 @@ func delSG(cmd *cobra.Command, _ []string) error { addr.SetContainerID(cid) addr.SetObjectID(id) - tombstone, err := client.DeleteObject(cli, ctx, + tombstone, err := client.DeleteObject(ctx, cli, new(client.DeleteObjectParams). WithAddress(addr), client.WithSession(tok)) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 223da2e1..715cc83a 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -242,6 +242,7 @@ func initObjectService(c *cfg) { placement.WithoutSuccessTracking(), ), ), + searchsvc.WithNetMapSource(c.cfgNetmap.wrapper), ) sSearchV2 := searchsvcV2.NewService( @@ -261,6 +262,7 @@ func initObjectService(c *cfg) { placement.SuccessAfter(1), ), ), + getsvc.WithNetMapSource(c.cfgNetmap.wrapper), ) sGetV2 := getsvcV2.NewService( diff --git a/go.mod b/go.mod index ba472fe7..f0501e41 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.92.0 - github.com/nspcc-dev/neofs-api-go v1.22.0 + github.com/nspcc-dev/neofs-api-go v1.22.1-0.20210112152207-43c579f6704d github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index d2f3b3cb..29b9824f 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/services/object/delete/v2/util.go b/pkg/services/object/delete/v2/util.go index 4b618290..1e08c9c2 100644 --- a/pkg/services/object/delete/v2/util.go +++ b/pkg/services/object/delete/v2/util.go @@ -20,8 +20,13 @@ func (s *Service) toPrm(req *objectV2.DeleteRequest, respBody *objectV2.DeleteRe return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(deletesvc.Prm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index d964a269..62e070c4 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -12,11 +12,43 @@ func (exec *execCtx) executeOnContainer() { return } - exec.log.Debug("trying to execute in container...") + lookupDepth := exec.netmapLookupDepth() + + exec.log.Debug("trying to execute in container...", + zap.Uint64("netmap lookup depth", lookupDepth), + ) + + // initialize epoch number + ok := exec.initEpoch() + if !ok { + return + } + + for { + if exec.processCurrentEpoch() { + break + } + + // check the maximum depth has been reached + if lookupDepth == 0 { + break + } + + lookupDepth-- + + // go to the previous epoch + exec.curProcEpoch-- + } +} + +func (exec *execCtx) processCurrentEpoch() bool { + exec.log.Debug("process epoch", + zap.Uint64("number", exec.curProcEpoch), + ) traverser, ok := exec.generateTraverser(exec.address()) if !ok { - return + return true } ctx, cancel := context.WithCancel(exec.context()) @@ -24,12 +56,12 @@ func (exec *execCtx) executeOnContainer() { exec.status = statusUndefined -loop: for { addrs := traverser.Next() if len(addrs) == 0 { exec.log.Debug("no more nodes, abort placement iteration") - break + + return false } for i := range addrs { @@ -38,7 +70,8 @@ loop: exec.log.Debug("interrupt placement iteration by context", zap.String("error", ctx.Err().Error()), ) - break loop + + return true default: } @@ -47,7 +80,7 @@ loop: // we reach the best result - split info with linking object ID. if exec.processNode(ctx, addrs[i]) { exec.log.Debug("completing the operation") - break loop + return true } } } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 05f8ff89..900c0cdd 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -10,6 +10,7 @@ import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" @@ -38,6 +39,8 @@ type execCtx struct { curOff uint64 head bool + + curProcEpoch uint64 } type execOption func(*execCtx) @@ -106,7 +109,9 @@ func (exec execCtx) key() *ecdsa.PrivateKey { } func (exec execCtx) callOptions() []client.CallOption { - return exec.prm.common.RemoteCallOptions() + return exec.prm.common.RemoteCallOptions( + util.WithNetmapEpoch(exec.curProcEpoch), + ) } func (exec execCtx) remotePrm() *client.GetObjectParams { @@ -135,8 +140,40 @@ func (exec *execCtx) headOnly() bool { return exec.head } +func (exec *execCtx) netmapEpoch() uint64 { + return exec.prm.common.NetmapEpoch() +} + +func (exec *execCtx) netmapLookupDepth() uint64 { + return exec.prm.common.NetmapLookupDepth() +} + +func (exec *execCtx) initEpoch() bool { + exec.curProcEpoch = exec.netmapEpoch() + if exec.curProcEpoch > 0 { + return true + } + + e, err := exec.svc.currentEpochReceiver.currentEpoch() + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not get current epoch number", + zap.String("error", err.Error()), + ) + + return false + case err == nil: + exec.curProcEpoch = e + return true + } +} + func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) { - t, err := exec.svc.traverserGenerator.GenerateTraverser(addr) + t, err := exec.svc.traverserGenerator.GenerateTraverser(addr, exec.curProcEpoch) switch { default: diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 63bea462..f5df9cdf 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -31,7 +31,7 @@ type testStorage struct { type testTraverserGenerator struct { c *container.Container - b placement.Builder + b map[uint64]placement.Builder } type testPlacementBuilder struct { @@ -49,6 +49,12 @@ type testClient struct { } } +type testEpochReceiver uint64 + +func (e testEpochReceiver) currentEpoch() (uint64, error) { + return uint64(e), nil +} + func newTestStorage() *testStorage { return &testStorage{ inhumed: make(map[string]struct{}), @@ -57,11 +63,11 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(addr *objectSDK.Address) (*placement.Traverser, error) { +func (g *testTraverserGenerator) GenerateTraverser(addr *objectSDK.Address, e uint64) (*placement.Traverser, error) { return placement.NewTraverser( placement.ForContainer(g.c), placement.ForObject(addr.ObjectID()), - placement.UseBuilder(g.b), + placement.UseBuilder(g.b[e]), placement.SuccessAfter(1), ) } @@ -467,11 +473,17 @@ func TestGetRemoteSmall(t *testing.T) { svc.log = test.NewLogger(false) svc.localStorage = newTestStorage() svc.assembly = true + + const curEpoch = 13 + svc.traverserGenerator = &testTraverserGenerator{ c: cnr, - b: b, + b: map[uint64]placement.Builder{ + curEpoch: b, + }, } svc.clientCache = c + svc.currentEpochReceiver = testEpochReceiver(curEpoch) return svc } @@ -1095,3 +1107,127 @@ func TestGetRemoteSmall(t *testing.T) { }) }) } + +func TestGetFromPastEpoch(t *testing.T) { + ctx := context.Background() + + cnr := container.New(container.WithPolicy(new(netmap.PlacementPolicy))) + cid := container.CalculateID(cnr) + + addr := generateAddress() + addr.SetContainerID(cid) + + payloadSz := uint64(10) + payload := make([]byte, payloadSz) + _, _ = rand.Read(payload) + + obj := generateObject(addr, nil, payload) + + ns, as := testNodeMatrix(t, []int{2, 2}) + + c11 := newTestClient() + c11.addResult(addr, nil, errors.New("any error")) + + c12 := newTestClient() + c12.addResult(addr, nil, errors.New("any error")) + + c21 := newTestClient() + c21.addResult(addr, nil, errors.New("any error")) + + c22 := newTestClient() + c22.addResult(addr, obj, nil) + + svc := &Service{cfg: new(cfg)} + svc.log = test.NewLogger(false) + svc.localStorage = newTestStorage() + svc.assembly = true + + const curEpoch = 13 + + svc.traverserGenerator = &testTraverserGenerator{ + c: cnr, + b: map[uint64]placement.Builder{ + curEpoch: &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns[:1], + }, + }, + curEpoch - 1: &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns[1:], + }, + }, + }, + } + + svc.clientCache = &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c11, + as[0][1]: c12, + as[1][0]: c21, + as[1][1]: c22, + }, + } + + svc.currentEpochReceiver = testEpochReceiver(curEpoch) + + w := NewSimpleObjectWriter() + + commonPrm := new(util.CommonPrm) + + p := Prm{} + p.SetObjectWriter(w) + p.SetCommonParameters(commonPrm) + p.WithAddress(addr) + + err := svc.Get(ctx, p) + require.True(t, errors.Is(err, object.ErrNotFound)) + + commonPrm.SetNetmapLookupDepth(1) + + err = svc.Get(ctx, p) + require.NoError(t, err) + require.Equal(t, obj.Object(), w.Object()) + + rp := RangePrm{} + rp.SetChunkWriter(w) + commonPrm.SetNetmapLookupDepth(0) + rp.SetCommonParameters(commonPrm) + rp.WithAddress(addr) + + off, ln := payloadSz/3, payloadSz/3 + + r := objectSDK.NewRange() + r.SetOffset(off) + r.SetLength(ln) + + rp.SetRange(r) + + err = svc.GetRange(ctx, rp) + require.True(t, errors.Is(err, object.ErrNotFound)) + + w = NewSimpleObjectWriter() + rp.SetChunkWriter(w) + commonPrm.SetNetmapLookupDepth(1) + + err = svc.GetRange(ctx, rp) + require.NoError(t, err) + require.Equal(t, payload[off:off+ln], w.Object().Payload()) + + hp := HeadPrm{} + hp.SetHeaderWriter(w) + commonPrm.SetNetmapLookupDepth(0) + hp.SetCommonParameters(commonPrm) + hp.WithAddress(addr) + + err = svc.Head(ctx, hp) + require.True(t, errors.Is(err, object.ErrNotFound)) + + w = NewSimpleObjectWriter() + hp.SetHeaderWriter(w) + commonPrm.SetNetmapLookupDepth(1) + + err = svc.Head(ctx, hp) + require.NoError(t, err) + require.Equal(t, obj.CutPayload().Object(), w.Object()) +} diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index 24a23791..e0263985 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" @@ -40,7 +41,11 @@ type cfg struct { } traverserGenerator interface { - GenerateTraverser(*objectSDK.Address) (*placement.Traverser, error) + GenerateTraverser(*objectSDK.Address, uint64) (*placement.Traverser, error) + } + + currentEpochReceiver interface { + currentEpoch() (uint64, error) } } @@ -110,3 +115,13 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option { c.traverserGenerator = t } } + +// WithNetMapSource returns option to set network +// map storage to receive current network state. +func WithNetMapSource(nmSrc netmap.Source) Option { + return func(c *cfg) { + c.currentEpochReceiver = &nmSrcWrapper{ + nmSrc: nmSrc, + } + } +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 26ea4f4f..a3354980 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" @@ -43,6 +44,10 @@ type hasherWrapper struct { hash io.Writer } +type nmSrcWrapper struct { + nmSrc netmap.Source +} + func NewSimpleObjectWriter() *SimpleObjectWriter { return &SimpleObjectWriter{ obj: object.NewRaw(), @@ -162,3 +167,7 @@ func (h *hasherWrapper) WriteChunk(p []byte) error { _, err := h.hash.Write(p) return err } + +func (n *nmSrcWrapper) currentEpoch() (uint64, error) { + return n.nmSrc.Epoch() +} diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index a40a3bf5..92fbd2fc 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -24,8 +24,13 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(getsvc.Prm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) @@ -45,8 +50,13 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(getsvc.RangePrm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) @@ -67,8 +77,13 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(getsvc.RangeHashPrm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) @@ -125,8 +140,13 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(getsvc.HeadPrm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index 959c8482..5d980fce 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -13,7 +13,12 @@ type streamer struct { func (s *streamer) Send(req *object.PutRequest) (err error) { switch v := req.GetBody().GetObjectPart().(type) { case *object.PutObjectPartInit: - if err = s.stream.Init(toInitPrm(v, req)); err != nil { + initPrm, err := toInitPrm(v, req) + if err != nil { + return err + } + + if err = s.stream.Init(initPrm); err != nil { err = errors.Wrapf(err, "(%T) could not init object put stream", s) } case *object.PutObjectPartChunk: diff --git a/pkg/services/object/put/v2/util.go b/pkg/services/object/put/v2/util.go index e4b7c9cb..b4235325 100644 --- a/pkg/services/object/put/v2/util.go +++ b/pkg/services/object/put/v2/util.go @@ -7,17 +7,22 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) -func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) *putsvc.PutInitPrm { +func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) { oV2 := new(objectV2.Object) oV2.SetObjectID(part.GetObjectID()) oV2.SetSignature(part.GetSignature()) oV2.SetHeader(part.GetHeader()) + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + return new(putsvc.PutInitPrm). WithObject( object.NewRawFromV2(oV2), ). - WithCommonPrm(util.CommonPrmFromV2(req)) + WithCommonPrm(commonPrm), nil } func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm { diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index b7f0eb80..25e3a4d2 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -12,17 +12,51 @@ func (exec *execCtx) executeOnContainer() { return } - exec.log.Debug("trying to execute in container...") + lookupDepth := exec.netmapLookupDepth() + + exec.log.Debug("trying to execute in container...", + zap.Uint64("netmap lookup depth", lookupDepth), + ) + + // initialize epoch number + ok := exec.initEpoch() + if !ok { + return + } + + for { + if exec.processCurrentEpoch() { + break + } + + // check the maximum depth has been reached + if lookupDepth == 0 { + break + } + + lookupDepth-- + + // go to the previous epoch + exec.curProcEpoch-- + } + + exec.status = statusOK + exec.err = nil +} + +func (exec *execCtx) processCurrentEpoch() bool { + exec.log.Debug("process epoch", + zap.Uint64("number", exec.curProcEpoch), + ) traverser, ok := exec.generateTraverser(exec.containerID()) if !ok { - return + return true } ctx, cancel := context.WithCancel(exec.context()) defer cancel() -loop: for { addrs := traverser.Next() if len(addrs) == 0 { @@ -36,7 +70,8 @@ loop: exec.log.Debug("interrupt placement iteration by context", zap.String("error", ctx.Err().Error()), ) - break loop + + return true default: } @@ -45,6 +80,5 @@ loop: } } - exec.status = statusOK - exec.err = nil + return false } diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index c6306f7f..3c7194bb 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" @@ -28,6 +29,8 @@ type execCtx struct { statusError log *logger.Logger + + curProcEpoch uint64 } const ( @@ -64,7 +67,9 @@ func (exec execCtx) key() *ecdsa.PrivateKey { } func (exec execCtx) callOptions() []client.CallOption { - return exec.prm.common.RemoteCallOptions() + return exec.prm.common.RemoteCallOptions( + util.WithNetmapEpoch(exec.curProcEpoch), + ) } func (exec execCtx) remotePrm() *client.SearchObjectParams { @@ -79,8 +84,40 @@ func (exec *execCtx) searchFilters() objectSDK.SearchFilters { return exec.prm.SearchFilters() } +func (exec *execCtx) netmapEpoch() uint64 { + return exec.prm.common.NetmapEpoch() +} + +func (exec *execCtx) netmapLookupDepth() uint64 { + return exec.prm.common.NetmapLookupDepth() +} + +func (exec *execCtx) initEpoch() bool { + exec.curProcEpoch = exec.netmapEpoch() + if exec.curProcEpoch > 0 { + return true + } + + e, err := exec.svc.currentEpochReceiver.currentEpoch() + + switch { + default: + exec.status = statusUndefined + exec.err = err + + exec.log.Debug("could not get current epoch number", + zap.String("error", err.Error()), + ) + + return false + case err == nil: + exec.curProcEpoch = e + return true + } +} + func (exec *execCtx) generateTraverser(cid *container.ID) (*placement.Traverser, bool) { - t, err := exec.svc.traverserGenerator.generateTraverser(cid) + t, err := exec.svc.traverserGenerator.generateTraverser(cid, exec.curProcEpoch) switch { default: diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 20e2c128..3b7a7969 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -31,7 +31,7 @@ type testStorage struct { type testTraverserGenerator struct { c *container.Container - b placement.Builder + b map[uint64]placement.Builder } type testPlacementBuilder struct { @@ -46,6 +46,12 @@ type simpleIDWriter struct { ids []*objectSDK.ID } +type testEpochReceiver uint64 + +func (e testEpochReceiver) currentEpoch() (uint64, error) { + return uint64(e), nil +} + func (s *simpleIDWriter) WriteIDs(ids []*objectSDK.ID) error { s.ids = append(s.ids, ids...) return nil @@ -57,10 +63,10 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) generateTraverser(_ *container.ID) (*placement.Traverser, error) { +func (g *testTraverserGenerator) generateTraverser(_ *container.ID, epoch uint64) (*placement.Traverser, error) { return placement.NewTraverser( placement.ForContainer(g.c), - placement.UseBuilder(g.b), + placement.UseBuilder(g.b[epoch]), placement.WithoutSuccessTracking(), ) } @@ -71,7 +77,10 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return nil, errors.New("vectors for address not found") } - return vs, nil + res := make([]netmap.Nodes, len(vs)) + copy(res, vs) + + return res, nil } func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (searchClient, error) { @@ -217,41 +226,6 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) { return mNodes, mAddr } -// -// func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK.ID, []byte) { -// curID := generateID() -// var prevID *objectSDK.ID -// -// addr := objectSDK.NewAddress() -// addr.SetContainerID(cid) -// -// res := make([]*object.RawObject, 0, ln) -// ids := make([]*objectSDK.ID, 0, ln) -// payload := make([]byte, 0, ln*10) -// -// for i := 0; i < ln; i++ { -// ids = append(ids, curID) -// addr.SetObjectID(curID) -// -// payloadPart := make([]byte, 10) -// rand.Read(payloadPart) -// -// o := generateObject(addr, prevID, []byte{byte(i)}) -// o.SetPayload(payloadPart) -// o.SetPayloadSize(uint64(len(payloadPart))) -// o.SetID(curID) -// -// payload = append(payload, payloadPart...) -// -// res = append(res, o) -// -// prevID = curID -// curID = generateID() -// } -// -// return res, ids, payload -// } - func TestGetRemoteSmall(t *testing.T) { ctx := context.Background() @@ -276,11 +250,16 @@ func TestGetRemoteSmall(t *testing.T) { svc.log = test.NewLogger(false) svc.localStorage = newTestStorage() + const curEpoch = 13 + svc.traverserGenerator = &testTraverserGenerator{ c: cnr, - b: b, + b: map[uint64]placement.Builder{ + curEpoch: b, + }, } svc.clientCache = c + svc.currentEpochReceiver = testEpochReceiver(curEpoch) return svc } @@ -334,3 +313,113 @@ func TestGetRemoteSmall(t *testing.T) { } }) } + +func TestGetFromPastEpoch(t *testing.T) { + ctx := context.Background() + + placementDim := []int{2, 2} + + rs := make([]*netmap.Replica, 0, len(placementDim)) + + for i := range placementDim { + r := netmap.NewReplica() + r.SetCount(uint32(placementDim[i])) + + rs = append(rs, r) + } + + pp := netmap.NewPlacementPolicy() + pp.SetReplicas(rs...) + + cnr := container.New(container.WithPolicy(pp)) + cid := container.CalculateID(cnr) + + addr := objectSDK.NewAddress() + addr.SetContainerID(cid) + + ns, as := testNodeMatrix(t, placementDim) + + c11 := newTestStorage() + ids11 := generateIDs(10) + c11.addResult(cid, ids11, nil) + + c12 := newTestStorage() + ids12 := generateIDs(10) + c12.addResult(cid, ids12, nil) + + c21 := newTestStorage() + ids21 := generateIDs(10) + c21.addResult(cid, ids21, nil) + + c22 := newTestStorage() + ids22 := generateIDs(10) + c22.addResult(cid, ids22, nil) + + svc := &Service{cfg: new(cfg)} + svc.log = test.NewLogger(false) + svc.localStorage = newTestStorage() + + const curEpoch = 13 + + svc.traverserGenerator = &testTraverserGenerator{ + c: cnr, + b: map[uint64]placement.Builder{ + curEpoch: &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns[:1], + }, + }, + curEpoch - 1: &testPlacementBuilder{ + vectors: map[string][]netmap.Nodes{ + addr.String(): ns[1:], + }, + }, + }, + } + + svc.clientCache = &testClientCache{ + clients: map[string]*testStorage{ + as[0][0]: c11, + as[0][1]: c12, + as[1][0]: c21, + as[1][1]: c22, + }, + } + + svc.currentEpochReceiver = testEpochReceiver(curEpoch) + + w := new(simpleIDWriter) + + p := Prm{} + p.WithContainerID(cid) + p.SetWriter(w) + + commonPrm := new(util.CommonPrm) + p.SetCommonParameters(commonPrm) + + assertContains := func(idsList ...[]*objectSDK.ID) { + var sz int + + for _, ids := range idsList { + sz += len(ids) + + for _, id := range ids { + require.Contains(t, w.ids, id) + } + } + + require.Len(t, w.ids, sz) + } + + err := svc.Search(ctx, p) + require.NoError(t, err) + assertContains(ids11, ids12) + + commonPrm.SetNetmapLookupDepth(1) + w = new(simpleIDWriter) + p.SetWriter(w) + + err = svc.Search(ctx, p) + require.NoError(t, err) + assertContains(ids11, ids12, ids21, ids22) +} diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index f90311a7..ccaa4daf 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -39,7 +40,11 @@ type cfg struct { } traverserGenerator interface { - generateTraverser(*container.ID) (*placement.Traverser, error) + generateTraverser(*container.ID, uint64) (*placement.Traverser, error) + } + + currentEpochReceiver interface { + currentEpoch() (uint64, error) } } @@ -100,3 +105,13 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option { c.traverserGenerator = (*traverseGeneratorWrapper)(t) } } + +// WithNetMapSource returns option to set network +// map storage to receive current network state. +func WithNetMapSource(nmSrc netmap.Source) Option { + return func(c *cfg) { + c.currentEpochReceiver = &nmSrcWrapper{ + nmSrc: nmSrc, + } + } +} diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index 36c8e8fa..a59cfebf 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -35,6 +36,10 @@ type storageEngineWrapper engine.StorageEngine type traverseGeneratorWrapper util.TraverserGenerator +type nmSrcWrapper struct { + nmSrc netmap.Source +} + func newUniqueAddressWriter(w IDListWriter) IDListWriter { return &uniqueIDWriter{ written: make(map[string]struct{}), @@ -102,9 +107,13 @@ func idsFromAddresses(addrs []*objectSDK.Address) []*objectSDK.ID { return ids } -func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID) (*placement.Traverser, error) { +func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID, epoch uint64) (*placement.Traverser, error) { a := objectSDK.NewAddress() a.SetContainerID(cid) - return (*util.TraverserGenerator)(e).GenerateTraverser(a) + return (*util.TraverserGenerator)(e).GenerateTraverser(a, epoch) +} + +func (n *nmSrcWrapper) currentEpoch() (uint64, error) { + return n.nmSrc.Epoch() } diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index 38193879..6e59921f 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -18,8 +18,13 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, err } + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + p := new(searchsvc.Prm) - p.SetCommonParameters(util.CommonPrmFromV2(req). + p.SetCommonParameters(commonPrm. WithPrivateKey(key), ) diff --git a/pkg/services/object/util/placement.go b/pkg/services/object/util/placement.go index 33be0872..393c88b0 100644 --- a/pkg/services/object/util/placement.go +++ b/pkg/services/object/util/placement.go @@ -116,12 +116,13 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav } } -// GenerateTraverser generates placement Traverser for provided object address. -func (g *TraverserGenerator) GenerateTraverser(addr *object.Address) (*placement.Traverser, error) { - // get latest network map - nm, err := netmap.GetLatestNetworkMap(g.netMapSrc) +// GenerateTraverser generates placement Traverser for provided object address +// using epoch-th network map. +func (g *TraverserGenerator) GenerateTraverser(addr *object.Address, epoch uint64) (*placement.Traverser, error) { + // get network map by epoch + nm, err := g.netMapSrc.GetNetMapByEpoch(epoch) if err != nil { - return nil, errors.Wrapf(err, "could not get latest network map") + return nil, errors.Wrapf(err, "could not get network map #%d", epoch) } // get container related container diff --git a/pkg/services/object/util/prm.go b/pkg/services/object/util/prm.go index a34f3e97..f8b4eb71 100644 --- a/pkg/services/object/util/prm.go +++ b/pkg/services/object/util/prm.go @@ -2,6 +2,7 @@ package util import ( "crypto/ecdsa" + "strconv" "github.com/nspcc-dev/neofs-api-go/pkg" "github.com/nspcc-dev/neofs-api-go/pkg/client" @@ -12,6 +13,8 @@ import ( type CommonPrm struct { local bool + netmapEpoch, netmapLookupDepth uint64 + token *token.SessionToken bearer *token.BearerToken @@ -21,6 +24,12 @@ type CommonPrm struct { callOpts []client.CallOption } +type remoteCallOpts struct { + opts []client.CallOption +} + +type DynamicCallOption func(*remoteCallOpts) + func (p *CommonPrm) WithLocalOnly(v bool) *CommonPrm { if p != nil { p.local = v @@ -81,14 +90,32 @@ func (p *CommonPrm) WithRemoteCallOptions(opts ...client.CallOption) *CommonPrm } // RemoteCallOptions return call options for remote client calls. -func (p *CommonPrm) RemoteCallOptions() []client.CallOption { +func (p *CommonPrm) RemoteCallOptions(dynamic ...DynamicCallOption) []client.CallOption { if p != nil { - return p.callOpts + o := &remoteCallOpts{ + opts: p.callOpts, + } + + for _, applier := range dynamic { + applier(o) + } + + return o.opts } return nil } +func WithNetmapEpoch(v uint64) DynamicCallOption { + return func(o *remoteCallOpts) { + xHdr := pkg.NewXHeader() + xHdr.SetKey(session.XHeaderNetmapEpoch) + xHdr.SetValue(strconv.FormatUint(v, 10)) + + o.opts = append(o.opts, client.WithXHeader(xHdr)) + } +} + func (p *CommonPrm) SessionToken() *token.SessionToken { if p != nil { return p.token @@ -105,9 +132,31 @@ func (p *CommonPrm) BearerToken() *token.BearerToken { return nil } +func (p *CommonPrm) NetmapEpoch() uint64 { + if p != nil { + return p.netmapEpoch + } + + return 0 +} + +func (p *CommonPrm) NetmapLookupDepth() uint64 { + if p != nil { + return p.netmapLookupDepth + } + + return 0 +} + +func (p *CommonPrm) SetNetmapLookupDepth(v uint64) { + if p != nil { + p.netmapLookupDepth = v + } +} + func CommonPrmFromV2(req interface { GetMetaHeader() *session.RequestMetaHeader -}) *CommonPrm { +}) (*CommonPrm, error) { meta := req.GetMetaHeader() xHdrs := meta.GetXHeaders() @@ -134,12 +183,29 @@ func CommonPrmFromV2(req interface { } for i := range xHdrs { - prm.callOpts = append(prm.callOpts, - client.WithXHeader( - pkg.NewXHeaderFromV2(xHdrs[i]), - ), - ) + switch xHdrs[i].GetKey() { + case session.XHeaderNetmapEpoch: + var err error + + prm.netmapEpoch, err = strconv.ParseUint(xHdrs[i].GetValue(), 10, 64) + if err != nil { + return nil, err + } + case session.XHeaderNetmapLookupDepth: + var err error + + prm.netmapLookupDepth, err = strconv.ParseUint(xHdrs[i].GetValue(), 10, 64) + if err != nil { + return nil, err + } + default: + prm.callOpts = append(prm.callOpts, + client.WithXHeader( + pkg.NewXHeaderFromV2(xHdrs[i]), + ), + ) + } } - return prm + return prm, nil }