package placement import ( "errors" "fmt" "slices" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) // Builder is an interface of the // object placement vector builder. type Builder interface { // BuildPlacement returns the list of placement vectors // for object according to the placement policy. // // Must return all container nodes if object identifier // is nil. BuildPlacement(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) } type NodeState interface { // LocalNodeInfo return current node state in FrostFS API v2 NodeInfo structure. LocalNodeInfo() *netmap.NodeInfo } // Option represents placement traverser option. type Option func(*cfg) // Traverser represents utility for controlling // traversal of object placement vectors. type Traverser struct { mtx sync.RWMutex vectors [][]netmap.NodeInfo rem []int } type cfg struct { trackCopies bool copyNumbers []uint32 flatSuccess *uint32 cnr cid.ID obj *oid.ID policySet bool policy netmap.PlacementPolicy builder Builder metrics []Metric nodeState NodeState } const invalidOptsMsg = "invalid traverser options" var errNilBuilder = errors.New("placement builder is nil") var errNilPolicy = errors.New("placement policy is nil") var errCopiesNumberLen = errors.New("copies number accepts only one number or array with length " + "equal to length of replicas") func defaultCfg() *cfg { return &cfg{ trackCopies: true, } } // NewTraverser creates, initializes with options and returns Traverser instance. func NewTraverser(opts ...Option) (*Traverser, error) { cfg := defaultCfg() for i := range opts { if opts[i] != nil { opts[i](cfg) } } cnLen := len(cfg.copyNumbers) if cnLen > 0 && cnLen != 1 && cnLen != cfg.policy.NumberOfReplicas() { return nil, errCopiesNumberLen } if cfg.builder == nil { return nil, fmt.Errorf("%s: %w", invalidOptsMsg, errNilBuilder) } else if !cfg.policySet { return nil, fmt.Errorf("%s: %w", invalidOptsMsg, errNilPolicy) } ns, err := cfg.builder.BuildPlacement(cfg.cnr, cfg.obj, cfg.policy) if err != nil { return nil, fmt.Errorf("could not build placement: %w", err) } // backward compatibility for scalar `copies_number` if len(cfg.copyNumbers) == 1 && cfg.copyNumbers[0] != 0 { cfg.flatSuccess = &cfg.copyNumbers[0] } var rem []int if len(cfg.metrics) > 0 && cfg.nodeState != nil { rem = defaultCopiesVector(cfg.policy) var unsortedVector []netmap.NodeInfo var regularVector []netmap.NodeInfo for i := range rem { unsortedVector = append(unsortedVector, ns[i][:rem[i]]...) regularVector = append(regularVector, ns[i][rem[i]:]...) } rem = []int{-1, -1} sortedVector, err := sortVector(cfg, unsortedVector) if err != nil { return nil, err } ns = [][]netmap.NodeInfo{sortedVector, regularVector} } else if cfg.flatSuccess != nil { ns = flatNodes(ns) rem = []int{int(*cfg.flatSuccess)} } else { rem = defaultCopiesVector(cfg.policy) // Bool flag which is set when cfg.copyNumbers contains not only zeros. // In this case we should not modify `rem` slice unless track // copies are ignored, because [0, ...] means that all copies should be // stored before returning OK to the client. var considerCopiesNumber bool for _, val := range cfg.copyNumbers { if val != 0 { considerCopiesNumber = true break } } for i := range rem { if !cfg.trackCopies { rem[i] = -1 } else if considerCopiesNumber && len(cfg.copyNumbers) > i { rem[i] = int(cfg.copyNumbers[i]) } } } return &Traverser{ rem: rem, vectors: ns, }, nil } func defaultCopiesVector(policy netmap.PlacementPolicy) []int { replNum := policy.NumberOfReplicas() copyVector := make([]int, 0, replNum) for i := range replNum { copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects()+policy.ReplicaDescriptor(i).GetECDataCount()+policy.ReplicaDescriptor(i).GetECParityCount())) } return copyVector } func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo { sz := 0 for i := range ns { sz += len(ns[i]) } flat := make([]netmap.NodeInfo, 0, sz) for i := range ns { flat = append(flat, ns[i]...) } return [][]netmap.NodeInfo{flat} } type nodeMetrics struct { index int metrics []int } func sortVector(cfg *cfg, unsortedVector []netmap.NodeInfo) ([]netmap.NodeInfo, error) { nm := make([]nodeMetrics, len(unsortedVector)) node := cfg.nodeState.LocalNodeInfo() for i := range unsortedVector { m := make([]int, len(cfg.metrics)) for j, pm := range cfg.metrics { m[j] = pm.CalculateValue(node, &unsortedVector[i]) } nm[i] = nodeMetrics{ index: i, metrics: m, } } slices.SortFunc(nm, func(a, b nodeMetrics) int { return slices.Compare(a.metrics, b.metrics) }) sortedVector := make([]netmap.NodeInfo, len(unsortedVector)) for i := range unsortedVector { sortedVector[i] = unsortedVector[nm[i].index] } return sortedVector, nil } // Node is a descriptor of storage node with information required for intra-container communication. type Node struct { addresses network.AddressGroup externalAddresses network.AddressGroup key []byte } // Addresses returns group of network addresses. func (x Node) Addresses() network.AddressGroup { return x.addresses } // ExternalAddresses returns group of network addresses. func (x Node) ExternalAddresses() network.AddressGroup { return x.externalAddresses } // PublicKey returns public key in a binary format. Should not be mutated. func (x Node) PublicKey() []byte { return x.key } // NewNode creates new Node. func NewNode(addresses network.AddressGroup, externalAddresses network.AddressGroup, key []byte) Node { return Node{ addresses: addresses, externalAddresses: externalAddresses, key: key, } } // Next returns next unprocessed address of the object placement. // // Returns nil if no nodes left or traversal operation succeeded. func (t *Traverser) Next() []Node { t.mtx.Lock() defer t.mtx.Unlock() t.skipEmptyVectors() if len(t.vectors) == 0 { return nil } else if len(t.vectors[0]) < t.rem[0] { return nil } count := t.rem[0] if count < 0 { count = len(t.vectors[0]) } nodes := make([]Node, count) for i := range count { err := nodes[i].addresses.FromIterator(network.NodeEndpointsIterator(t.vectors[0][i])) if err != nil { return nil } ext := t.vectors[0][i].ExternalAddresses() if len(ext) > 0 { // Ignore the error if this field is incorrectly formed. _ = nodes[i].externalAddresses.FromStringSlice(ext) } nodes[i].key = t.vectors[0][i].PublicKey() } t.vectors[0] = t.vectors[0][count:] return nodes } func (t *Traverser) skipEmptyVectors() { for i := 0; i < len(t.vectors); i++ { // don't use range, slice changes in body if len(t.vectors[i]) == 0 && t.rem[i] <= 0 || t.rem[0] == 0 { t.vectors = append(t.vectors[:i], t.vectors[i+1:]...) t.rem = append(t.rem[:i], t.rem[i+1:]...) i-- } else { break } } } // SubmitSuccess writes single succeeded node operation. func (t *Traverser) SubmitSuccess() { t.mtx.Lock() if len(t.rem) > 0 { t.rem[0]-- } t.mtx.Unlock() } // Success returns true if traversal operation succeeded. func (t *Traverser) Success() bool { t.mtx.RLock() defer t.mtx.RUnlock() for i := range t.rem { if t.rem[i] > 0 { return false } } return true } // UseBuilder is a placement builder setting option. // // Overlaps UseNetworkMap option. func UseBuilder(b Builder) Option { return func(c *cfg) { c.builder = b } } // ForContainer is a traversal container setting option. func ForContainer(cnr container.Container) Option { return func(c *cfg) { c.policy = cnr.PlacementPolicy() c.policySet = true container.CalculateID(&c.cnr, cnr) } } // ForObject is a processing object setting option. func ForObject(id oid.ID) Option { return func(c *cfg) { c.obj = &id } } // SuccessAfter is a flat success number setting option. // // Option has no effect if the number is not positive. func SuccessAfter(v uint32) Option { return func(c *cfg) { if v > 0 { c.flatSuccess = &v } } } // ResetSuccessAfter resets flat success number setting option. func ResetSuccessAfter() Option { return func(c *cfg) { c.flatSuccess = nil } } // WithoutSuccessTracking disables success tracking in traversal. func WithoutSuccessTracking() Option { return func(c *cfg) { c.trackCopies = false } } func WithCopyNumbers(v []uint32) Option { return func(c *cfg) { c.copyNumbers = v } } // WithPriorityMetrics use provided priority metrics to sort nodes. func WithPriorityMetrics(m []Metric) Option { return func(c *cfg) { c.metrics = m } } // WithNodeState provide state of the current node. func WithNodeState(s NodeState) Option { return func(c *cfg) { c.nodeState = s } }