[#234] services/object: Support netmap epoch and lookup dead in read ops

Support processing of NetmapEpoch and NetmapLookupDepth X-headers when
processing object read operations. Placement for operations
Get/Head/GetRange/GetRangeHash/Search is built for the epoch specified in
NetmapEpoch X-header (by default latest). Also the specified operations are
processed until success is achieved for network maps from the past up to
NetmapLookupDepth value. Behavior for default values (zero or missing) left
unchanged.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-01-12 17:55:02 +03:00 committed by Alex Vanin
parent 2f4d90025f
commit 1e170c3812
22 changed files with 613 additions and 90 deletions

View file

@ -264,7 +264,7 @@ func deleteObject(cmd *cobra.Command, _ []string) error {
return err
}
tombstoneAddr, err := client.DeleteObject(cli, ctx,
tombstoneAddr, err := client.DeleteObject(ctx, cli,
new(client.DeleteObjectParams).WithAddress(objAddr),
append(globalCallOptions(),
client.WithSession(tok),

View file

@ -289,7 +289,7 @@ func delSG(cmd *cobra.Command, _ []string) error {
addr.SetContainerID(cid)
addr.SetObjectID(id)
tombstone, err := client.DeleteObject(cli, ctx,
tombstone, err := client.DeleteObject(ctx, cli,
new(client.DeleteObjectParams).
WithAddress(addr),
client.WithSession(tok))

View file

@ -242,6 +242,7 @@ func initObjectService(c *cfg) {
placement.WithoutSuccessTracking(),
),
),
searchsvc.WithNetMapSource(c.cfgNetmap.wrapper),
)
sSearchV2 := searchsvcV2.NewService(
@ -261,6 +262,7 @@ func initObjectService(c *cfg) {
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.cfgNetmap.wrapper),
)
sGetV2 := getsvcV2.NewService(

2
go.mod
View file

@ -15,7 +15,7 @@ require (
github.com/multiformats/go-multihash v0.0.13 // indirect
github.com/nspcc-dev/hrw v1.0.9
github.com/nspcc-dev/neo-go v0.92.0
github.com/nspcc-dev/neofs-api-go v1.22.0
github.com/nspcc-dev/neofs-api-go v1.22.1-0.20210112152207-43c579f6704d
github.com/nspcc-dev/neofs-crypto v0.3.0
github.com/nspcc-dev/tzhash v1.4.0
github.com/panjf2000/ants/v2 v2.3.0

BIN
go.sum

Binary file not shown.

View file

@ -20,8 +20,13 @@ func (s *Service) toPrm(req *objectV2.DeleteRequest, respBody *objectV2.DeleteRe
return nil, err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
p := new(deletesvc.Prm)
p.SetCommonParameters(util.CommonPrmFromV2(req).
p.SetCommonParameters(commonPrm.
WithPrivateKey(key),
)

View file

@ -12,11 +12,43 @@ func (exec *execCtx) executeOnContainer() {
return
}
exec.log.Debug("trying to execute in container...")
lookupDepth := exec.netmapLookupDepth()
exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
)
// initialize epoch number
ok := exec.initEpoch()
if !ok {
return
}
for {
if exec.processCurrentEpoch() {
break
}
// check the maximum depth has been reached
if lookupDepth == 0 {
break
}
lookupDepth--
// go to the previous epoch
exec.curProcEpoch--
}
}
func (exec *execCtx) processCurrentEpoch() bool {
exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch),
)
traverser, ok := exec.generateTraverser(exec.address())
if !ok {
return
return true
}
ctx, cancel := context.WithCancel(exec.context())
@ -24,12 +56,12 @@ func (exec *execCtx) executeOnContainer() {
exec.status = statusUndefined
loop:
for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug("no more nodes, abort placement iteration")
break
return false
}
for i := range addrs {
@ -38,7 +70,8 @@ loop:
exec.log.Debug("interrupt placement iteration by context",
zap.String("error", ctx.Err().Error()),
)
break loop
return true
default:
}
@ -47,7 +80,7 @@ loop:
// we reach the best result - split info with linking object ID.
if exec.processNode(ctx, addrs[i]) {
exec.log.Debug("completing the operation")
break loop
return true
}
}
}

View file

@ -10,6 +10,7 @@ import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
@ -38,6 +39,8 @@ type execCtx struct {
curOff uint64
head bool
curProcEpoch uint64
}
type execOption func(*execCtx)
@ -106,7 +109,9 @@ func (exec execCtx) key() *ecdsa.PrivateKey {
}
func (exec execCtx) callOptions() []client.CallOption {
return exec.prm.common.RemoteCallOptions()
return exec.prm.common.RemoteCallOptions(
util.WithNetmapEpoch(exec.curProcEpoch),
)
}
func (exec execCtx) remotePrm() *client.GetObjectParams {
@ -135,8 +140,40 @@ func (exec *execCtx) headOnly() bool {
return exec.head
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.svc.currentEpochReceiver.currentEpoch()
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number",
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.curProcEpoch = e
return true
}
}
func (exec *execCtx) generateTraverser(addr *objectSDK.Address) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr)
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr, exec.curProcEpoch)
switch {
default:

View file

@ -31,7 +31,7 @@ type testStorage struct {
type testTraverserGenerator struct {
c *container.Container
b placement.Builder
b map[uint64]placement.Builder
}
type testPlacementBuilder struct {
@ -49,6 +49,12 @@ type testClient struct {
}
}
type testEpochReceiver uint64
func (e testEpochReceiver) currentEpoch() (uint64, error) {
return uint64(e), nil
}
func newTestStorage() *testStorage {
return &testStorage{
inhumed: make(map[string]struct{}),
@ -57,11 +63,11 @@ func newTestStorage() *testStorage {
}
}
func (g *testTraverserGenerator) GenerateTraverser(addr *objectSDK.Address) (*placement.Traverser, error) {
func (g *testTraverserGenerator) GenerateTraverser(addr *objectSDK.Address, e uint64) (*placement.Traverser, error) {
return placement.NewTraverser(
placement.ForContainer(g.c),
placement.ForObject(addr.ObjectID()),
placement.UseBuilder(g.b),
placement.UseBuilder(g.b[e]),
placement.SuccessAfter(1),
)
}
@ -467,11 +473,17 @@ func TestGetRemoteSmall(t *testing.T) {
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
svc.assembly = true
const curEpoch = 13
svc.traverserGenerator = &testTraverserGenerator{
c: cnr,
b: b,
b: map[uint64]placement.Builder{
curEpoch: b,
},
}
svc.clientCache = c
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
return svc
}
@ -1095,3 +1107,127 @@ func TestGetRemoteSmall(t *testing.T) {
})
})
}
func TestGetFromPastEpoch(t *testing.T) {
ctx := context.Background()
cnr := container.New(container.WithPolicy(new(netmap.PlacementPolicy)))
cid := container.CalculateID(cnr)
addr := generateAddress()
addr.SetContainerID(cid)
payloadSz := uint64(10)
payload := make([]byte, payloadSz)
_, _ = rand.Read(payload)
obj := generateObject(addr, nil, payload)
ns, as := testNodeMatrix(t, []int{2, 2})
c11 := newTestClient()
c11.addResult(addr, nil, errors.New("any error"))
c12 := newTestClient()
c12.addResult(addr, nil, errors.New("any error"))
c21 := newTestClient()
c21.addResult(addr, nil, errors.New("any error"))
c22 := newTestClient()
c22.addResult(addr, obj, nil)
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
svc.assembly = true
const curEpoch = 13
svc.traverserGenerator = &testTraverserGenerator{
c: cnr,
b: map[uint64]placement.Builder{
curEpoch: &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns[:1],
},
},
curEpoch - 1: &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns[1:],
},
},
},
}
svc.clientCache = &testClientCache{
clients: map[string]*testClient{
as[0][0]: c11,
as[0][1]: c12,
as[1][0]: c21,
as[1][1]: c22,
},
}
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
w := NewSimpleObjectWriter()
commonPrm := new(util.CommonPrm)
p := Prm{}
p.SetObjectWriter(w)
p.SetCommonParameters(commonPrm)
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.True(t, errors.Is(err, object.ErrNotFound))
commonPrm.SetNetmapLookupDepth(1)
err = svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj.Object(), w.Object())
rp := RangePrm{}
rp.SetChunkWriter(w)
commonPrm.SetNetmapLookupDepth(0)
rp.SetCommonParameters(commonPrm)
rp.WithAddress(addr)
off, ln := payloadSz/3, payloadSz/3
r := objectSDK.NewRange()
r.SetOffset(off)
r.SetLength(ln)
rp.SetRange(r)
err = svc.GetRange(ctx, rp)
require.True(t, errors.Is(err, object.ErrNotFound))
w = NewSimpleObjectWriter()
rp.SetChunkWriter(w)
commonPrm.SetNetmapLookupDepth(1)
err = svc.GetRange(ctx, rp)
require.NoError(t, err)
require.Equal(t, payload[off:off+ln], w.Object().Payload())
hp := HeadPrm{}
hp.SetHeaderWriter(w)
commonPrm.SetNetmapLookupDepth(0)
hp.SetCommonParameters(commonPrm)
hp.WithAddress(addr)
err = svc.Head(ctx, hp)
require.True(t, errors.Is(err, object.ErrNotFound))
w = NewSimpleObjectWriter()
hp.SetHeaderWriter(w)
commonPrm.SetNetmapLookupDepth(1)
err = svc.Head(ctx, hp)
require.NoError(t, err)
require.Equal(t, obj.CutPayload().Object(), w.Object())
}

View file

@ -5,6 +5,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/client"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
@ -40,7 +41,11 @@ type cfg struct {
}
traverserGenerator interface {
GenerateTraverser(*objectSDK.Address) (*placement.Traverser, error)
GenerateTraverser(*objectSDK.Address, uint64) (*placement.Traverser, error)
}
currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
}
@ -110,3 +115,13 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
c.traverserGenerator = t
}
}
// WithNetMapSource returns option to set network
// map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) {
c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
}
}

View file

@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/client"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
@ -43,6 +44,10 @@ type hasherWrapper struct {
hash io.Writer
}
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{
obj: object.NewRaw(),
@ -162,3 +167,7 @@ func (h *hasherWrapper) WriteChunk(p []byte) error {
_, err := h.hash.Write(p)
return err
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -24,8 +24,13 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
return nil, err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
p := new(getsvc.Prm)
p.SetCommonParameters(util.CommonPrmFromV2(req).
p.SetCommonParameters(commonPrm.
WithPrivateKey(key),
)
@ -45,8 +50,13 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
return nil, err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
p := new(getsvc.RangePrm)
p.SetCommonParameters(util.CommonPrmFromV2(req).
p.SetCommonParameters(commonPrm.
WithPrivateKey(key),
)
@ -67,8 +77,13 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
return nil, err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
p := new(getsvc.RangeHashPrm)
p.SetCommonParameters(util.CommonPrmFromV2(req).
p.SetCommonParameters(commonPrm.
WithPrivateKey(key),
)
@ -125,8 +140,13 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon
return nil, err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
p := new(getsvc.HeadPrm)
p.SetCommonParameters(util.CommonPrmFromV2(req).
p.SetCommonParameters(commonPrm.
WithPrivateKey(key),
)

View file

@ -13,7 +13,12 @@ type streamer struct {
func (s *streamer) Send(req *object.PutRequest) (err error) {
switch v := req.GetBody().GetObjectPart().(type) {
case *object.PutObjectPartInit:
if err = s.stream.Init(toInitPrm(v, req)); err != nil {
initPrm, err := toInitPrm(v, req)
if err != nil {
return err
}
if err = s.stream.Init(initPrm); err != nil {
err = errors.Wrapf(err, "(%T) could not init object put stream", s)
}
case *object.PutObjectPartChunk:

View file

@ -7,17 +7,22 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
)
func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) *putsvc.PutInitPrm {
func toInitPrm(part *objectV2.PutObjectPartInit, req *objectV2.PutRequest) (*putsvc.PutInitPrm, error) {
oV2 := new(objectV2.Object)
oV2.SetObjectID(part.GetObjectID())
oV2.SetSignature(part.GetSignature())
oV2.SetHeader(part.GetHeader())
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
return new(putsvc.PutInitPrm).
WithObject(
object.NewRawFromV2(oV2),
).
WithCommonPrm(util.CommonPrmFromV2(req))
WithCommonPrm(commonPrm), nil
}
func toChunkPrm(req *objectV2.PutObjectPartChunk) *putsvc.PutChunkPrm {

View file

@ -12,17 +12,51 @@ func (exec *execCtx) executeOnContainer() {
return
}
exec.log.Debug("trying to execute in container...")
lookupDepth := exec.netmapLookupDepth()
exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
)
// initialize epoch number
ok := exec.initEpoch()
if !ok {
return
}
for {
if exec.processCurrentEpoch() {
break
}
// check the maximum depth has been reached
if lookupDepth == 0 {
break
}
lookupDepth--
// go to the previous epoch
exec.curProcEpoch--
}
exec.status = statusOK
exec.err = nil
}
func (exec *execCtx) processCurrentEpoch() bool {
exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch),
)
traverser, ok := exec.generateTraverser(exec.containerID())
if !ok {
return
return true
}
ctx, cancel := context.WithCancel(exec.context())
defer cancel()
loop:
for {
addrs := traverser.Next()
if len(addrs) == 0 {
@ -36,7 +70,8 @@ loop:
exec.log.Debug("interrupt placement iteration by context",
zap.String("error", ctx.Err().Error()),
)
break loop
return true
default:
}
@ -45,6 +80,5 @@ loop:
}
}
exec.status = statusOK
exec.err = nil
return false
}

View file

@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
@ -28,6 +29,8 @@ type execCtx struct {
statusError
log *logger.Logger
curProcEpoch uint64
}
const (
@ -64,7 +67,9 @@ func (exec execCtx) key() *ecdsa.PrivateKey {
}
func (exec execCtx) callOptions() []client.CallOption {
return exec.prm.common.RemoteCallOptions()
return exec.prm.common.RemoteCallOptions(
util.WithNetmapEpoch(exec.curProcEpoch),
)
}
func (exec execCtx) remotePrm() *client.SearchObjectParams {
@ -79,8 +84,40 @@ func (exec *execCtx) searchFilters() objectSDK.SearchFilters {
return exec.prm.SearchFilters()
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.svc.currentEpochReceiver.currentEpoch()
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number",
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.curProcEpoch = e
return true
}
}
func (exec *execCtx) generateTraverser(cid *container.ID) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.generateTraverser(cid)
t, err := exec.svc.traverserGenerator.generateTraverser(cid, exec.curProcEpoch)
switch {
default:

View file

@ -31,7 +31,7 @@ type testStorage struct {
type testTraverserGenerator struct {
c *container.Container
b placement.Builder
b map[uint64]placement.Builder
}
type testPlacementBuilder struct {
@ -46,6 +46,12 @@ type simpleIDWriter struct {
ids []*objectSDK.ID
}
type testEpochReceiver uint64
func (e testEpochReceiver) currentEpoch() (uint64, error) {
return uint64(e), nil
}
func (s *simpleIDWriter) WriteIDs(ids []*objectSDK.ID) error {
s.ids = append(s.ids, ids...)
return nil
@ -57,10 +63,10 @@ func newTestStorage() *testStorage {
}
}
func (g *testTraverserGenerator) generateTraverser(_ *container.ID) (*placement.Traverser, error) {
func (g *testTraverserGenerator) generateTraverser(_ *container.ID, epoch uint64) (*placement.Traverser, error) {
return placement.NewTraverser(
placement.ForContainer(g.c),
placement.UseBuilder(g.b),
placement.UseBuilder(g.b[epoch]),
placement.WithoutSuccessTracking(),
)
}
@ -71,7 +77,10 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
return nil, errors.New("vectors for address not found")
}
return vs, nil
res := make([]netmap.Nodes, len(vs))
copy(res, vs)
return res, nil
}
func (c *testClientCache) get(_ *ecdsa.PrivateKey, addr string) (searchClient, error) {
@ -217,41 +226,6 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) {
return mNodes, mAddr
}
//
// func generateChain(ln int, cid *container.ID) ([]*object.RawObject, []*objectSDK.ID, []byte) {
// curID := generateID()
// var prevID *objectSDK.ID
//
// addr := objectSDK.NewAddress()
// addr.SetContainerID(cid)
//
// res := make([]*object.RawObject, 0, ln)
// ids := make([]*objectSDK.ID, 0, ln)
// payload := make([]byte, 0, ln*10)
//
// for i := 0; i < ln; i++ {
// ids = append(ids, curID)
// addr.SetObjectID(curID)
//
// payloadPart := make([]byte, 10)
// rand.Read(payloadPart)
//
// o := generateObject(addr, prevID, []byte{byte(i)})
// o.SetPayload(payloadPart)
// o.SetPayloadSize(uint64(len(payloadPart)))
// o.SetID(curID)
//
// payload = append(payload, payloadPart...)
//
// res = append(res, o)
//
// prevID = curID
// curID = generateID()
// }
//
// return res, ids, payload
// }
func TestGetRemoteSmall(t *testing.T) {
ctx := context.Background()
@ -276,11 +250,16 @@ func TestGetRemoteSmall(t *testing.T) {
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
const curEpoch = 13
svc.traverserGenerator = &testTraverserGenerator{
c: cnr,
b: b,
b: map[uint64]placement.Builder{
curEpoch: b,
},
}
svc.clientCache = c
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
return svc
}
@ -334,3 +313,113 @@ func TestGetRemoteSmall(t *testing.T) {
}
})
}
func TestGetFromPastEpoch(t *testing.T) {
ctx := context.Background()
placementDim := []int{2, 2}
rs := make([]*netmap.Replica, 0, len(placementDim))
for i := range placementDim {
r := netmap.NewReplica()
r.SetCount(uint32(placementDim[i]))
rs = append(rs, r)
}
pp := netmap.NewPlacementPolicy()
pp.SetReplicas(rs...)
cnr := container.New(container.WithPolicy(pp))
cid := container.CalculateID(cnr)
addr := objectSDK.NewAddress()
addr.SetContainerID(cid)
ns, as := testNodeMatrix(t, placementDim)
c11 := newTestStorage()
ids11 := generateIDs(10)
c11.addResult(cid, ids11, nil)
c12 := newTestStorage()
ids12 := generateIDs(10)
c12.addResult(cid, ids12, nil)
c21 := newTestStorage()
ids21 := generateIDs(10)
c21.addResult(cid, ids21, nil)
c22 := newTestStorage()
ids22 := generateIDs(10)
c22.addResult(cid, ids22, nil)
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
const curEpoch = 13
svc.traverserGenerator = &testTraverserGenerator{
c: cnr,
b: map[uint64]placement.Builder{
curEpoch: &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns[:1],
},
},
curEpoch - 1: &testPlacementBuilder{
vectors: map[string][]netmap.Nodes{
addr.String(): ns[1:],
},
},
},
}
svc.clientCache = &testClientCache{
clients: map[string]*testStorage{
as[0][0]: c11,
as[0][1]: c12,
as[1][0]: c21,
as[1][1]: c22,
},
}
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
w := new(simpleIDWriter)
p := Prm{}
p.WithContainerID(cid)
p.SetWriter(w)
commonPrm := new(util.CommonPrm)
p.SetCommonParameters(commonPrm)
assertContains := func(idsList ...[]*objectSDK.ID) {
var sz int
for _, ids := range idsList {
sz += len(ids)
for _, id := range ids {
require.Contains(t, w.ids, id)
}
}
require.Len(t, w.ids, sz)
}
err := svc.Search(ctx, p)
require.NoError(t, err)
assertContains(ids11, ids12)
commonPrm.SetNetmapLookupDepth(1)
w = new(simpleIDWriter)
p.SetWriter(w)
err = svc.Search(ctx, p)
require.NoError(t, err)
assertContains(ids11, ids12, ids21, ids22)
}

View file

@ -6,6 +6,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
@ -39,7 +40,11 @@ type cfg struct {
}
traverserGenerator interface {
generateTraverser(*container.ID) (*placement.Traverser, error)
generateTraverser(*container.ID, uint64) (*placement.Traverser, error)
}
currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
}
@ -100,3 +105,13 @@ func WithTraverserGenerator(t *util.TraverserGenerator) Option {
c.traverserGenerator = (*traverseGeneratorWrapper)(t)
}
}
// WithNetMapSource returns option to set network
// map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) {
c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
}
}

View file

@ -7,6 +7,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
@ -35,6 +36,10 @@ type storageEngineWrapper engine.StorageEngine
type traverseGeneratorWrapper util.TraverserGenerator
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func newUniqueAddressWriter(w IDListWriter) IDListWriter {
return &uniqueIDWriter{
written: make(map[string]struct{}),
@ -102,9 +107,13 @@ func idsFromAddresses(addrs []*objectSDK.Address) []*objectSDK.ID {
return ids
}
func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID) (*placement.Traverser, error) {
func (e *traverseGeneratorWrapper) generateTraverser(cid *container.ID, epoch uint64) (*placement.Traverser, error) {
a := objectSDK.NewAddress()
a.SetContainerID(cid)
return (*util.TraverserGenerator)(e).GenerateTraverser(a)
return (*util.TraverserGenerator)(e).GenerateTraverser(a, epoch)
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -18,8 +18,13 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
return nil, err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
p := new(searchsvc.Prm)
p.SetCommonParameters(util.CommonPrmFromV2(req).
p.SetCommonParameters(commonPrm.
WithPrivateKey(key),
)

View file

@ -116,12 +116,13 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
}
}
// GenerateTraverser generates placement Traverser for provided object address.
func (g *TraverserGenerator) GenerateTraverser(addr *object.Address) (*placement.Traverser, error) {
// get latest network map
nm, err := netmap.GetLatestNetworkMap(g.netMapSrc)
// GenerateTraverser generates placement Traverser for provided object address
// using epoch-th network map.
func (g *TraverserGenerator) GenerateTraverser(addr *object.Address, epoch uint64) (*placement.Traverser, error) {
// get network map by epoch
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, errors.Wrapf(err, "could not get latest network map")
return nil, errors.Wrapf(err, "could not get network map #%d", epoch)
}
// get container related container

View file

@ -2,6 +2,7 @@ package util
import (
"crypto/ecdsa"
"strconv"
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
@ -12,6 +13,8 @@ import (
type CommonPrm struct {
local bool
netmapEpoch, netmapLookupDepth uint64
token *token.SessionToken
bearer *token.BearerToken
@ -21,6 +24,12 @@ type CommonPrm struct {
callOpts []client.CallOption
}
type remoteCallOpts struct {
opts []client.CallOption
}
type DynamicCallOption func(*remoteCallOpts)
func (p *CommonPrm) WithLocalOnly(v bool) *CommonPrm {
if p != nil {
p.local = v
@ -81,14 +90,32 @@ func (p *CommonPrm) WithRemoteCallOptions(opts ...client.CallOption) *CommonPrm
}
// RemoteCallOptions return call options for remote client calls.
func (p *CommonPrm) RemoteCallOptions() []client.CallOption {
func (p *CommonPrm) RemoteCallOptions(dynamic ...DynamicCallOption) []client.CallOption {
if p != nil {
return p.callOpts
o := &remoteCallOpts{
opts: p.callOpts,
}
for _, applier := range dynamic {
applier(o)
}
return o.opts
}
return nil
}
func WithNetmapEpoch(v uint64) DynamicCallOption {
return func(o *remoteCallOpts) {
xHdr := pkg.NewXHeader()
xHdr.SetKey(session.XHeaderNetmapEpoch)
xHdr.SetValue(strconv.FormatUint(v, 10))
o.opts = append(o.opts, client.WithXHeader(xHdr))
}
}
func (p *CommonPrm) SessionToken() *token.SessionToken {
if p != nil {
return p.token
@ -105,9 +132,31 @@ func (p *CommonPrm) BearerToken() *token.BearerToken {
return nil
}
func (p *CommonPrm) NetmapEpoch() uint64 {
if p != nil {
return p.netmapEpoch
}
return 0
}
func (p *CommonPrm) NetmapLookupDepth() uint64 {
if p != nil {
return p.netmapLookupDepth
}
return 0
}
func (p *CommonPrm) SetNetmapLookupDepth(v uint64) {
if p != nil {
p.netmapLookupDepth = v
}
}
func CommonPrmFromV2(req interface {
GetMetaHeader() *session.RequestMetaHeader
}) *CommonPrm {
}) (*CommonPrm, error) {
meta := req.GetMetaHeader()
xHdrs := meta.GetXHeaders()
@ -134,12 +183,29 @@ func CommonPrmFromV2(req interface {
}
for i := range xHdrs {
prm.callOpts = append(prm.callOpts,
client.WithXHeader(
pkg.NewXHeaderFromV2(xHdrs[i]),
),
)
switch xHdrs[i].GetKey() {
case session.XHeaderNetmapEpoch:
var err error
prm.netmapEpoch, err = strconv.ParseUint(xHdrs[i].GetValue(), 10, 64)
if err != nil {
return nil, err
}
case session.XHeaderNetmapLookupDepth:
var err error
prm.netmapLookupDepth, err = strconv.ParseUint(xHdrs[i].GetValue(), 10, 64)
if err != nil {
return nil, err
}
default:
prm.callOpts = append(prm.callOpts,
client.WithXHeader(
pkg.NewXHeaderFromV2(xHdrs[i]),
),
)
}
}
return prm
return prm, nil
}