137 lines
4.7 KiB
Go
137 lines
4.7 KiB
Go
package policer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var errInvalidECPlacement = errors.New("invalid EC placement: EC placement must have one placement vector")
|
|
|
|
func (p *Policer) processECContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
|
if objInfo.ECInfo == nil {
|
|
return p.processECContainerRepObject(ctx, objInfo, policy)
|
|
}
|
|
return p.processECContainerECObject(ctx, objInfo, policy)
|
|
}
|
|
|
|
// processECContainerRepObject processes non erasure coded objects in EC container: tombstones, locks and linking objects.
|
|
// All of them must be stored on all of the container nodes.
|
|
func (p *Policer) processECContainerRepObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
|
objID := objInfo.Address.Object()
|
|
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objID, policy)
|
|
if err != nil {
|
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
|
}
|
|
if len(nn) != 1 {
|
|
return errInvalidECPlacement
|
|
}
|
|
|
|
c := &placementRequirements{}
|
|
checkedNodes := newNodeCache()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
rd := policy.ReplicaDescriptor(0)
|
|
// processNodes replaces rd.GetECDataCount() + rd.GetECParityCount() for len(nn[0]) for locks, tomstones and linking objects.
|
|
p.processRepNodes(ctx, c, objInfo, nn[0], rd.GetECDataCount()+rd.GetECParityCount(), checkedNodes)
|
|
|
|
if !c.needLocalCopy && c.removeLocalCopy {
|
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected,
|
|
zap.Stringer("object", objInfo.Address),
|
|
)
|
|
|
|
p.cbRedundantCopy(ctx, objInfo.Address)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Policer) processECContainerECObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error {
|
|
nn, err := p.placementBuilder.BuildPlacement(objInfo.Address.Container(), &objInfo.ECInfo.ParentID, policy)
|
|
if err != nil {
|
|
return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err)
|
|
}
|
|
if len(nn) != 1 {
|
|
return errInvalidECPlacement
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
validPlacement := p.processECChunk(ctx, objInfo, nn[0])
|
|
if !validPlacement {
|
|
p.pullRequiredECChunks(ctx, objInfo, nn[0])
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processECChunk replicates EC chunk if needed.
|
|
// Returns True if current chunk should be stored on current node.
|
|
func (p *Policer) processECChunk(ctx context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) bool {
|
|
var removeLocalChunk bool
|
|
requiredNode := nodes[int(objInfo.ECInfo.Index)%(len(nodes))]
|
|
if p.cfg.netmapKeys.IsLocalKey(requiredNode.PublicKey()) {
|
|
// current node is required node, we are happy
|
|
return true
|
|
}
|
|
if requiredNode.IsMaintenance() {
|
|
// consider maintenance mode has object, but do not drop local copy
|
|
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
|
|
return false
|
|
}
|
|
|
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
|
_, err := p.remoteHeader(callCtx, requiredNode, objInfo.Address)
|
|
cancel()
|
|
|
|
if err == nil {
|
|
removeLocalChunk = true
|
|
} else if client.IsErrObjectNotFound(err) {
|
|
p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", objInfo.Address), zap.Uint32("shortage", 1))
|
|
task := replicator.Task{
|
|
NumCopies: 1,
|
|
Addr: objInfo.Address,
|
|
Nodes: []netmap.NodeInfo{requiredNode},
|
|
}
|
|
p.replicator.HandleReplicationTask(ctx, task, newNodeCache())
|
|
} else if isClientErrMaintenance(err) {
|
|
// consider maintenance mode has object, but do not drop local copy
|
|
p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(requiredNode)))
|
|
} else {
|
|
p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", objInfo.Address), zap.String("error", err.Error()))
|
|
}
|
|
|
|
if removeLocalChunk {
|
|
p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address))
|
|
p.cbRedundantCopy(ctx, objInfo.Address)
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (p *Policer) pullRequiredECChunks(_ context.Context, objInfo objectcore.Info, nodes []netmap.NodeInfo) {
|
|
requiredChunkIndexes := make(map[uint32]struct{})
|
|
for i, n := range nodes {
|
|
if p.cfg.netmapKeys.IsLocalKey(n.PublicKey()) {
|
|
requiredChunkIndexes[uint32(i)] = struct{}{}
|
|
}
|
|
}
|
|
if len(requiredChunkIndexes) == 0 {
|
|
p.log.Info(logs.PolicerNodeIsNotContainerNodeForECObject, zap.Stringer("object", objInfo.ECInfo.ParentID))
|
|
return
|
|
}
|
|
}
|