frostfs-node/pkg/services/policer/check.go
Leonard Lyubich 0dab4b7581 [#108] services: Implement Policer service
Implement Policer service that performs background work to check compliance
with the placement policy for local objects in the container. In the initial
implementation, the selection of the working queue of objects is
simplified, and there is no transfer of the result to the replicator.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-10-21 14:42:51 +03:00

103 lines
2.2 KiB
Go

package policer
import (
"context"
"strings"
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/network"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"go.uber.org/zap"
)
func (p *Policer) processObject(ctx context.Context, addr *object.Address) {
cnr, err := p.cnrSrc.Get(addr.GetContainerID())
if err != nil {
p.log.Error("could not get container",
zap.String("error", err.Error()),
)
return
}
policy := cnr.GetPlacementPolicy()
nn, err := p.placementBuilder.BuildPlacement(addr, policy)
if err != nil {
p.log.Error("could not build placement vector for object",
zap.String("error", err.Error()),
)
return
}
replicas := policy.GetReplicas()
for i := range nn {
select {
case <-ctx.Done():
return
default:
}
p.processNodes(ctx, addr, nn[i], replicas[i].GetCount())
}
}
func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes netmap.Nodes, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr)
for i := 0; shortage > 0 && i < len(nodes); i++ {
select {
case <-ctx.Done():
return
default:
}
netAddr := nodes[i].NetworkAddress()
log := p.log.With(zap.String("node", netAddr))
node, err := network.AddressFromString(netAddr)
if err != nil {
log.Error("could not parse network address")
continue
}
if network.IsLocalAddress(p.localAddrSrc, node) {
shortage--
} else {
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
_, err = p.remoteHeader.Head(callCtx, prm.WithNodeAddress(node))
cancel()
if err != nil {
// FIXME: this is a temporary solution to resolve 404 response from remote node
// We need to distinguish problem nodes from nodes without an object.
if strings.Contains(err.Error(), headsvc.ErrNotFound.Error()) {
continue
} else {
log.Error("could not receive object header",
zap.String("error", err.Error()),
)
}
} else {
shortage--
}
}
nodes = append(nodes[:i], nodes[i+1:]...)
i--
}
if shortage > 0 {
p.log.Info("shortage of object copies detected",
zap.Uint32("shortage", shortage),
)
// TODO: send task to replicator
}
}