diff --git a/go.mod b/go.mod index a38a0750..91d8a572 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/multiformats/go-multiaddr-net v0.1.2 // v0.1.1 => v0.1.2 github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78 - github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200916115135-ff325b877023 + github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200917104527-95ae0a649608 github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index 339114a6..a05829f2 100644 --- a/go.sum +++ b/go.sum @@ -269,6 +269,8 @@ github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78 h1:stIa+nB github.com/nspcc-dev/neo-go v0.91.1-pre.0.20200827184617-7560aa345a78/go.mod h1:G6HdOWvzQ6tlvFdvFSN/PgCzLPN/X/X4d5hTjFRUDcc= github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200916115135-ff325b877023 h1:tltnqudivH6TBzs4DEouLx9rwPUBuvn7bjm4EZyosUc= github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200916115135-ff325b877023/go.mod h1:FsFd1z4YzoEgPlltsUgnqna9qhcF87RHYjot0pby2L4= +github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200917104527-95ae0a649608 h1:rT3MBvM3u5D8p/V8lbt0TVP75nXQSC/YCwpORrv6QEA= +github.com/nspcc-dev/neofs-api-go v1.3.1-0.20200917104527-95ae0a649608/go.mod h1:FsFd1z4YzoEgPlltsUgnqna9qhcF87RHYjot0pby2L4= github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA= github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= diff --git a/pkg/services/object_manager/placement/netmap.go b/pkg/services/object_manager/placement/netmap.go new file mode 100644 index 00000000..e82ca0d1 --- /dev/null +++ b/pkg/services/object_manager/placement/netmap.go @@ -0,0 +1,32 @@ +package placement + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/pkg/errors" +) + +type netMapBuilder struct { + nm *netmap.Netmap +} + +func (b *netMapBuilder) BuildPlacement(a *object.Address, p *netmap.PlacementPolicy) ([]netmap.Nodes, error) { + aV2 := a.ToV2() + + cn, err := b.nm.GetContainerNodes(p, aV2.GetContainerID().GetValue()) + if err != nil { + return nil, errors.Wrap(err, "could not get container nodes") + } + + oid := aV2.GetObjectID() + if oid == nil { + return cn.Replicas(), nil + } + + on, err := b.nm.GetPlacementVectors(cn, oid.GetValue()) + if err != nil { + return nil, errors.Wrap(err, "could not get placement vectors for object") + } + + return on, nil +} diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go new file mode 100644 index 00000000..94fb47b7 --- /dev/null +++ b/pkg/services/object_manager/placement/traverser.go @@ -0,0 +1,182 @@ +package placement + +import ( + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/pkg/errors" +) + +// Builder is an interface of the +// object placement vector builder. +type Builder interface { + // BuildPlacement returns the list of placement vectors + // for object according to the placement policy. + // + // Must return all container nodes if object identifier + // is nil. + BuildPlacement(*object.Address, *netmap.PlacementPolicy) ([]netmap.Nodes, error) +} + +// Option represents placement traverser option. +type Option func(*cfg) + +// Traverser represents utility for controlling +// traversal of object placement vectors. +type Traverser struct { + mtx *sync.RWMutex + + rem int + + nextI, nextJ int + + vectors []netmap.Nodes +} + +type cfg struct { + rem int + + addr *object.Address + + policy *netmap.PlacementPolicy + + builder Builder +} + +var errNilBuilder = errors.New("placement builder is nil") + +func defaultCfg() *cfg { + return &cfg{ + rem: 1, + addr: object.NewAddress(), + } +} + +// NewTraverser creates, initializes with options and returns Traverser instance. +func NewTraverser(opts ...Option) (*Traverser, error) { + cfg := defaultCfg() + + for i := range opts { + if opts[i] != nil { + opts[i](cfg) + } + } + + if cfg.builder == nil { + return nil, errors.Wrap(errNilBuilder, "incomplete traverser options") + } + + ns, err := cfg.builder.BuildPlacement(cfg.addr, cfg.policy) + if err != nil { + return nil, errors.Wrap(err, "could not build placement") + } + + return &Traverser{ + mtx: new(sync.RWMutex), + rem: cfg.rem, + vectors: ns, + }, nil +} + +// Next returns next unprocessed address of the object placement. +// +// Returns nil if no nodes left or traversal operation succeeded. +func (t *Traverser) Next() *network.Address { + t.mtx.Lock() + defer t.mtx.Unlock() + + if t.rem == 0 || t.nextI == len(t.vectors) { + return nil + } + + addr, err := network.AddressFromString(t.vectors[t.nextI][t.nextJ].NetworkAddress()) + if err != nil { + // TODO: log error + } + + if t.nextJ++; t.nextJ == len(t.vectors[t.nextI]) { + t.nextJ = 0 + t.nextI++ + } + + return addr +} + +// SubmitSuccess writes single succeeded node operation. +func (t *Traverser) SubmitSuccess() { + t.mtx.Lock() + t.rem-- + t.mtx.Unlock() +} + +// Success returns true if traversal operation succeeded. +func (t *Traverser) Success() bool { + t.mtx.RLock() + s := t.rem <= 0 + t.mtx.RUnlock() + + return s +} + +// UseBuilder is a placement builder setting option. +// +// Overlaps UseNetworkMap option. +func UseBuilder(b Builder) Option { + return func(c *cfg) { + c.builder = b + } +} + +// UseNetworkMap is a placement builder based on network +// map setting option. +// +// Overlaps UseBuilder option. +func UseNetworkMap(nm *netmap.Netmap) Option { + return func(c *cfg) { + c.builder = &netMapBuilder{ + nm: nm, + } + } +} + +// ForContainer is a traversal container setting option. +func ForContainer(cnr *container.Container) Option { + return func(c *cfg) { + c.policy = cnr.GetPlacementPolicy() + + c.rem = 0 + for _, r := range c.policy.GetReplicas() { + c.rem += int(r.GetCount()) + } + + c.addr.SetContainerID(container.CalculateID(cnr)) + } +} + +// ForObject is a processing object setting option. +func ForObject(id *object.ID) Option { + return func(c *cfg) { + c.addr.SetObjectID(id) + } +} + +// SuccessAfter is a success number setting option. +// +// Option has no effect if the number is not positive. +func SuccessAfter(v int) Option { + return func(c *cfg) { + if v > 0 { + c.rem = v + } + } +} + +// WithoutSuccessTracking disables success tracking in traversal. +func WithoutSuccessTracking() Option { + return func(c *cfg) { + c.rem = -1 + } +} diff --git a/pkg/services/object_manager/placement/traverser_test.go b/pkg/services/object_manager/placement/traverser_test.go new file mode 100644 index 00000000..291ec070 --- /dev/null +++ b/pkg/services/object_manager/placement/traverser_test.go @@ -0,0 +1,120 @@ +package placement + +import ( + "strconv" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + netmapV2 "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" + "github.com/stretchr/testify/require" +) + +type testBuilder struct { + vectors []netmap.Nodes +} + +func (b testBuilder) BuildPlacement(*object.Address, *netmap.PlacementPolicy) ([]netmap.Nodes, error) { + return b.vectors, nil +} + +func testNode(v uint32) netmapV2.NodeInfo { + n := netmapV2.NodeInfo{} + n.SetAddress("/ip4/0.0.0.0/tcp/" + strconv.Itoa(int(v))) + + return n +} + +func flattenVectors(vs []netmap.Nodes) netmap.Nodes { + v := make(netmap.Nodes, 0) + + for i := range vs { + v = append(v, vs[i]...) + } + + return v +} + +func testPlacement(t *testing.T, sz []int) []netmap.Nodes { + res := make([]netmap.Nodes, 0, len(sz)) + num := uint32(0) + + for i := range sz { + ns := make([]netmapV2.NodeInfo, 0, sz[i]) + + for j := 0; j < sz[i]; j++ { + ns = append(ns, testNode(num)) + num++ + } + + res = append(res, netmap.NodesFromV2(ns)) + } + + return res +} + +func TestTraverserObjectScenarios(t *testing.T) { + t.Run("search scenario", func(t *testing.T) { + nodes := testPlacement(t, []int{2, 3}) + + allNodes := flattenVectors(nodes) + + tr, err := NewTraverser( + UseBuilder(&testBuilder{vectors: nodes}), + WithoutSuccessTracking(), + ) + require.NoError(t, err) + + require.True(t, tr.Success()) + + for i := range allNodes { + require.Equal(t, allNodes[i].NetworkAddress(), tr.Next().String()) + } + + require.Nil(t, tr.Next()) + require.True(t, tr.Success()) + }) + + t.Run("read scenario", func(t *testing.T) { + nodes := testPlacement(t, []int{5, 3, 4}) + + allNodes := flattenVectors(nodes) + + tr, err := NewTraverser( + UseBuilder(&testBuilder{vectors: nodes}), + ) + require.NoError(t, err) + + for i := range allNodes[:len(allNodes)-3] { + require.Equal(t, allNodes[i].NetworkAddress(), tr.Next().String()) + } + + require.False(t, tr.Success()) + + tr.SubmitSuccess() + + require.True(t, tr.Success()) + + require.Nil(t, tr.Next()) + }) + + t.Run("put scenario", func(t *testing.T) { + nodes := testPlacement(t, []int{3, 3, 3}) + sucCount := 3 + + tr, err := NewTraverser( + UseBuilder(&testBuilder{vectors: nodes}), + SuccessAfter(sucCount), + ) + require.NoError(t, err) + + for i := 0; i < sucCount; i++ { + require.NotNil(t, tr.Next()) + require.False(t, tr.Success()) + tr.SubmitSuccess() + } + + require.Nil(t, tr.Next()) + require.True(t, tr.Success()) + }) +}