frostfs-node/pkg/services/object/util/placement.go
Leonard Lyubich 6e5d7f84af [#607] network: Generalize LocalAddressSource to address group
Make `LocalAddressSource.LocalAddress` method to return `AddressGroup`. Make
`IsLocalAddress` function to accept parameter of type `AddressGroup`. Adopt
the application code with temporary `GroupFromAddress` helper.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2021-06-28 15:52:50 +03:00

161 lines
4.4 KiB
Go

package util
import (
"fmt"
netmapSDK "github.com/nspcc-dev/neofs-api-go/pkg/netmap"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
)
type localPlacement struct {
builder placement.Builder
localAddrSrc network.LocalAddressSource
}
type remotePlacement struct {
builder placement.Builder
localAddrSrc network.LocalAddressSource
}
// TraverserGenerator represents tool that generates
// container traverser for the particular need.
type TraverserGenerator struct {
netMapSrc netmap.Source
cnrSrc container.Source
localAddrSrc network.LocalAddressSource
customOpts []placement.Option
}
func NewLocalPlacement(b placement.Builder, s network.LocalAddressSource) placement.Builder {
return &localPlacement{
builder: b,
localAddrSrc: s,
}
}
func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.PlacementPolicy) ([]netmapSDK.Nodes, error) {
vs, err := p.builder.BuildPlacement(addr, policy)
if err != nil {
return nil, fmt.Errorf("(%T) could not build object placement: %w", p, err)
}
for i := range vs {
for j := range vs[i] {
var addr network.Address
err := addr.FromString(vs[i][j].Address())
if err != nil {
// TODO: log error
continue
}
if network.IsLocalAddress(p.localAddrSrc, network.GroupFromAddress(addr)) {
return []netmapSDK.Nodes{{vs[i][j]}}, nil
}
}
}
return nil, fmt.Errorf("(%T) local node is outside of object placement", p)
}
// NewRemotePlacementBuilder creates, initializes and returns placement builder that
// excludes local node from any placement vector.
func NewRemotePlacementBuilder(b placement.Builder, s network.LocalAddressSource) placement.Builder {
return &remotePlacement{
builder: b,
localAddrSrc: s,
}
}
func (p *remotePlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.PlacementPolicy) ([]netmapSDK.Nodes, error) {
vs, err := p.builder.BuildPlacement(addr, policy)
if err != nil {
return nil, fmt.Errorf("(%T) could not build object placement: %w", p, err)
}
for i := range vs {
for j := 0; j < len(vs[i]); j++ {
var addr network.Address
err := addr.FromString(vs[i][j].Address())
if err != nil {
// TODO: log error
continue
}
if network.IsLocalAddress(p.localAddrSrc, network.GroupFromAddress(addr)) {
vs[i] = append(vs[i][:j], vs[i][j+1:]...)
j--
}
}
}
return vs, nil
}
// NewTraverserGenerator creates, initializes and returns new TraverserGenerator instance.
func NewTraverserGenerator(nmSrc netmap.Source, cnrSrc container.Source, localAddrSrc network.LocalAddressSource) *TraverserGenerator {
return &TraverserGenerator{
netMapSrc: nmSrc,
cnrSrc: cnrSrc,
localAddrSrc: localAddrSrc,
}
}
// WithTraverseOptions returns TraverseGenerator that additionally applies provided options.
func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *TraverserGenerator {
return &TraverserGenerator{
netMapSrc: g.netMapSrc,
cnrSrc: g.cnrSrc,
localAddrSrc: g.localAddrSrc,
customOpts: opts,
}
}
// GenerateTraverser generates placement Traverser for provided object address
// using epoch-th network map.
func (g *TraverserGenerator) GenerateTraverser(addr *object.Address, epoch uint64) (*placement.Traverser, error) {
// get network map by epoch
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
}
// get container related container
cnr, err := g.cnrSrc.Get(addr.ContainerID())
if err != nil {
return nil, fmt.Errorf("could not get container: %w", err)
}
// allocate placement traverser options
traverseOpts := make([]placement.Option, 0, 3+len(g.customOpts))
traverseOpts = append(traverseOpts, g.customOpts...)
// create builder of the remote nodes from network map
builder := NewRemotePlacementBuilder(
placement.NewNetworkMapBuilder(nm),
g.localAddrSrc,
)
traverseOpts = append(traverseOpts,
// set processing container
placement.ForContainer(cnr),
// set identifier of the processing object
placement.ForObject(addr.ObjectID()),
// set placement builder
placement.UseBuilder(builder),
)
return placement.NewTraverser(traverseOpts...)
}