frostfs-node/pkg/services/policer/check.go
Leonard Lyubich e99e25b52f [#1680] replicator: Consider nodes under maintenance as OK
Nodes under maintenance SHOULD not respond to object requests. Based on
this, storage node's Policer SHOULD consider such nodes as problem ones.
However, to prevent spam with the new replicas, on the contrary, Policer
should consider them normal.

Make `Policer.processNodes` to exclude elements if `IsMaintenance()`
with shortage decreasing.

Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
2022-10-11 12:54:27 +03:00

222 lines
5.1 KiB
Go

package policer
import (
"context"
"encoding/hex"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
// tracks Policer's check progress
type nodeCache map[uint64]bool
func newNodeCache() *nodeCache {
m := make(map[uint64]bool)
return (*nodeCache)(&m)
}
func (n *nodeCache) set(node netmap.NodeInfo, val bool) {
(*n)[node.Hash()] = val
}
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
n.set(node, false)
}
// submits storage node as a current object replica holder.
func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
n.set(node, true)
}
// processStatus returns current processing status of the storage node
//
// >0 if node does not currently hold the object
// 0 if node already holds the object
// <0 if node has not been processed yet
func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 {
val, ok := (*n)[node.Hash()]
if !ok {
return -1
}
if val {
return 0
}
return 1
}
// SubmitSuccessfulReplication marks given storage node as a current object
// replica holder.
//
// SubmitSuccessfulReplication implements replicator.TaskResult.
func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node)
}
func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
idCnr := addr.Container()
cnr, err := p.cnrSrc.Get(idCnr)
if err != nil {
p.log.Error("could not get container",
zap.Stringer("cid", idCnr),
zap.String("error", err.Error()),
)
if container.IsErrNotFound(err) {
var prm engine.InhumePrm
prm.MarkAsGarbage(addr)
prm.WithForceRemoval()
_, err := p.jobQueue.localStorage.Inhume(prm)
if err != nil {
p.log.Error("could not inhume object with missing container",
zap.Stringer("cid", idCnr),
zap.Stringer("oid", addr.Object()),
zap.String("error", err.Error()))
}
}
return
}
policy := cnr.Value.PlacementPolicy()
obj := addr.Object()
nn, err := p.placementBuilder.BuildPlacement(idCnr, &obj, policy)
if err != nil {
p.log.Error("could not build placement vector for object",
zap.Stringer("cid", idCnr),
zap.String("error", err.Error()),
)
return
}
c := &processPlacementContext{
Context: ctx,
}
var numOfContainerNodes int
for i := range nn {
numOfContainerNodes += len(nn[i])
}
// cached info about already checked nodes
checkedNodes := newNodeCache()
for i := range nn {
select {
case <-ctx.Done():
return
default:
}
p.processNodes(c, addr, nn[i], policy.ReplicaNumberByIndex(i), checkedNodes)
}
if !c.needLocalCopy {
p.log.Info("redundant local object copy detected",
zap.Stringer("object", addr),
)
p.cbRedundantCopy(addr)
}
}
type processPlacementContext struct {
context.Context
needLocalCopy bool
}
func (p *Policer) processNodes(ctx *processPlacementContext, addr oid.Address,
nodes []netmap.NodeInfo, shortage uint32, checkedNodes *nodeCache) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
handleMaintenance := func(node netmap.NodeInfo) {
// consider remote nodes under maintenance as problem OK. Such
// nodes MAY not respond with object, however, this is how we
// prevent spam with new replicas.
checkedNodes.submitReplicaHolder(node)
shortage--
p.log.Debug("consider node under maintenance as OK",
zap.String("node", hex.EncodeToString(node.PublicKey())),
)
}
for i := 0; shortage > 0 && i < len(nodes); i++ {
select {
case <-ctx.Done():
return
default:
}
if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) {
ctx.needLocalCopy = true
shortage--
} else if nodes[i].IsMaintenance() {
handleMaintenance(nodes[i])
} else {
if status := checkedNodes.processStatus(nodes[i]); status >= 0 {
if status == 0 {
// node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...)
i--
shortage--
}
continue
}
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i]))
cancel()
if client.IsErrObjectNotFound(err) {
checkedNodes.submitReplicaCandidate(nodes[i])
continue
}
if err != nil {
p.log.Error("receive object header to check policy compliance",
zap.Stringer("object", addr),
zap.String("error", err.Error()),
)
} else {
shortage--
checkedNodes.submitReplicaHolder(nodes[i])
}
}
nodes = append(nodes[:i], nodes[i+1:]...)
i--
}
if shortage > 0 {
p.log.Debug("shortage of object copies detected",
zap.Stringer("object", addr),
zap.Uint32("shortage", shortage),
)
var task replicator.Task
task.SetObjectAddress(addr)
task.SetNodes(nodes)
task.SetCopiesNumber(shortage)
p.replicator.HandleTask(ctx, task, checkedNodes)
}
}