frostfs-node/pkg/services/object_manager/placement/traverser.go
Leonard Lyubich 5470d94416 [] placement: Fix local and single-success placement traversal
In previous implementation placement traverser processed incorrectly with
local placement build. Also entity incorrectly traversed the placement
vectors for fixed number read operations until success. The erroneous
behavior was due to the use of a vector number of successes instead of
a scalar number in these scenarios.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-12-01 19:36:46 +03:00

233 lines
4.7 KiB
Go

package placement
import (
"sync"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/pkg/errors"
)
// Builder is an interface of the
// object placement vector builder.
type Builder interface {
// BuildPlacement returns the list of placement vectors
// for object according to the placement policy.
//
// Must return all container nodes if object identifier
// is nil.
BuildPlacement(*object.Address, *netmap.PlacementPolicy) ([]netmap.Nodes, error)
}
// Option represents placement traverser option.
type Option func(*cfg)
// Traverser represents utility for controlling
// traversal of object placement vectors.
type Traverser struct {
mtx *sync.RWMutex
vectors []netmap.Nodes
rem []int
}
type cfg struct {
trackCopies bool
flatSuccess *uint32
addr *object.Address
policy *netmap.PlacementPolicy
builder Builder
}
const invalidOptsMsg = "invalid traverser options"
var errNilBuilder = errors.New("placement builder is nil")
var errNilPolicy = errors.New("placement policy is nil")
func defaultCfg() *cfg {
return &cfg{
trackCopies: true,
addr: object.NewAddress(),
}
}
// NewTraverser creates, initializes with options and returns Traverser instance.
func NewTraverser(opts ...Option) (*Traverser, error) {
cfg := defaultCfg()
for i := range opts {
if opts[i] != nil {
opts[i](cfg)
}
}
if cfg.builder == nil {
return nil, errors.Wrap(errNilBuilder, invalidOptsMsg)
} else if cfg.policy == nil {
return nil, errors.Wrap(errNilPolicy, invalidOptsMsg)
}
ns, err := cfg.builder.BuildPlacement(cfg.addr, cfg.policy)
if err != nil {
return nil, errors.Wrap(err, "could not build placement")
}
var rem []int
if cfg.flatSuccess != nil {
ns = flatNodes(ns)
rem = []int{int(*cfg.flatSuccess)}
} else {
rs := cfg.policy.Replicas()
rem = make([]int, 0, len(rs))
for i := range rs {
if cfg.trackCopies {
rem = append(rem, int(rs[i].Count()))
} else {
rem = append(rem, -1)
}
}
}
return &Traverser{
mtx: new(sync.RWMutex),
rem: rem,
vectors: ns,
}, nil
}
func flatNodes(ns []netmap.Nodes) []netmap.Nodes {
sz := 0
for i := range ns {
sz += len(ns[i])
}
flat := make(netmap.Nodes, 0, sz)
for i := range ns {
flat = append(flat, ns[i]...)
}
return []netmap.Nodes{flat}
}
// Next returns next unprocessed address of the object placement.
//
// Returns nil if no nodes left or traversal operation succeeded.
func (t *Traverser) Next() []*network.Address {
t.mtx.Lock()
defer t.mtx.Unlock()
t.skipEmptyVectors()
if len(t.vectors) == 0 {
return nil
} else if len(t.vectors[0]) < t.rem[0] {
return nil
}
count := t.rem[0]
if count < 0 {
count = len(t.vectors[0])
}
addrs := make([]*network.Address, 0, count)
for i := 0; i < count; i++ {
addr, err := network.AddressFromString(t.vectors[0][i].Address())
if err != nil {
// TODO: log error
return nil
}
addrs = append(addrs, addr)
}
t.vectors[0] = t.vectors[0][count:]
return addrs
}
func (t *Traverser) skipEmptyVectors() {
for i := 0; i < len(t.vectors); i++ { // don't use range, slice changes in body
if len(t.vectors[i]) == 0 && t.rem[i] <= 0 || t.rem[0] == 0 {
t.vectors = append(t.vectors[:i], t.vectors[i+1:]...)
t.rem = append(t.rem[:i], t.rem[i+1:]...)
i--
} else {
break
}
}
}
// SubmitSuccess writes single succeeded node operation.
func (t *Traverser) SubmitSuccess() {
t.mtx.Lock()
if len(t.rem) > 0 {
t.rem[0]--
}
t.mtx.Unlock()
}
// Success returns true if traversal operation succeeded.
func (t *Traverser) Success() bool {
t.mtx.RLock()
defer t.mtx.RUnlock()
for i := range t.rem {
if t.rem[i] > 0 {
return false
}
}
return true
}
// UseBuilder is a placement builder setting option.
//
// Overlaps UseNetworkMap option.
func UseBuilder(b Builder) Option {
return func(c *cfg) {
c.builder = b
}
}
// ForContainer is a traversal container setting option.
func ForContainer(cnr *container.Container) Option {
return func(c *cfg) {
c.policy = cnr.PlacementPolicy()
c.addr.SetContainerID(container.CalculateID(cnr))
}
}
// ForObject is a processing object setting option.
func ForObject(id *object.ID) Option {
return func(c *cfg) {
c.addr.SetObjectID(id)
}
}
// SuccessAfter is a flat success number setting option.
//
// Option has no effect if the number is not positive.
func SuccessAfter(v uint32) Option {
return func(c *cfg) {
if v > 0 {
c.flatSuccess = &v
}
}
}
// WithoutSuccessTracking disables success tracking in traversal.
func WithoutSuccessTracking() Option {
return func(c *cfg) {
c.trackCopies = false
}
}