package policer import ( "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error { ctx, span := tracing.StartSpanFromContext(ctx, "Policer.ProcessObject", trace.WithAttributes( attribute.String("address", objInfo.Address.String()), attribute.Bool("is_linking_object", objInfo.IsLinkingObject), attribute.Bool("is_ec_part", objInfo.ECInfo != nil), attribute.String("type", objInfo.Type.String()), )) defer span.End() cnr, err := p.cnrSrc.Get(objInfo.Address.Container()) if err != nil { if client.IsErrContainerNotFound(err) { existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, objInfo.Address.Container()) if errWasRemoved != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved) } else if existed { err := p.buryFn(ctx, objInfo.Address) if err != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err) } } } return fmt.Errorf("%s: %w", logs.PolicerCouldNotGetContainer, err) } policy := cnr.Value.PlacementPolicy() if policycore.IsECPlacement(policy) { return p.processECContainerObject(ctx, objInfo, cnr.Value) } return p.processRepContainerObject(ctx, objInfo, policy) } func (p *Policer) processRepContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { idObj := objInfo.Address.Object() idCnr := objInfo.Address.Container() nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy) if err != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) } c := &placementRequirements{} // cached info about already checked nodes checkedNodes := newNodeCache() for i := range nn { select { case <-ctx.Done(): return ctx.Err() default: } shortage := policy.ReplicaDescriptor(i).NumberOfObjects() if objInfo.Type == objectSDK.TypeLock || objInfo.Type == objectSDK.TypeTombstone || objInfo.IsLinkingObject { // all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects // for correct object removal protection: // - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests; // - `LOCK` object removal is a prohibited action in the GC. shortage = uint32(len(nn[i])) } p.processRepNodes(ctx, c, objInfo, nn[i], shortage, checkedNodes) } if !c.needLocalCopy && c.removeLocalCopy { p.log.Info(ctx, logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address), ) p.cbRedundantCopy(ctx, objInfo.Address) } return nil } type placementRequirements struct { // needLocalCopy is true if the current node must store an object according to the storage policy. needLocalCopy bool // removeLocalCopy is true if all copies are stored according to the storage policy // and the current node doesn't need to store an object. removeLocalCopy bool } func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRequirements, objInfo objectcore.Info, nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache, ) { addr := objInfo.Address // Number of copies that are stored on maintenance nodes. var uncheckedCopies int for i := 0; shortage > 0 && i < len(nodes); i++ { select { case <-ctx.Done(): return default: } if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) { requirements.needLocalCopy = true shortage-- } else if nodes[i].Status().IsMaintenance() { shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies) } else { if status := checkedNodes.processStatus(nodes[i]); status.Processed() { if status == nodeHoldsObject { // node already contains replica, no need to replicate nodes = append(nodes[:i], nodes[i+1:]...) i-- shortage-- } continue } callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) _, err := p.remoteHeader(callCtx, nodes[i], addr, false) cancel() if err == nil { shortage-- checkedNodes.submitReplicaHolder(nodes[i]) } else { if client.IsErrObjectNotFound(err) { checkedNodes.submitReplicaCandidate(nodes[i]) continue } else if client.IsErrNodeUnderMaintenance(err) { shortage, uncheckedCopies = p.handleMaintenance(ctx, nodes[i], checkedNodes, shortage, uncheckedCopies) } else { p.log.Error(ctx, logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", addr), zap.String("error", err.Error()), ) } } } nodes = append(nodes[:i], nodes[i+1:]...) i-- } p.handleProcessNodesResult(ctx, addr, requirements, nodes, checkedNodes, shortage, uncheckedCopies) } // handleMaintenance handles node in maintenance mode and returns new shortage and uncheckedCopies values // // consider remote nodes under maintenance as problem OK. Such // nodes MAY not respond with object, however, this is how we // prevent spam with new replicas. // However, additional copies should not be removed in this case, // because we can remove the only copy this way. func (p *Policer) handleMaintenance(ctx context.Context, node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) { checkedNodes.submitReplicaHolder(node) shortage-- uncheckedCopies++ p.log.Debug(ctx, logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(node)), ) return shortage, uncheckedCopies } func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements, nodes []netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int, ) { switch { case shortage > 0: p.log.Debug(ctx, logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", addr), zap.Uint32("shortage", shortage), ) task := replicator.Task{ NumCopies: shortage, Addr: addr, Nodes: nodes, } p.replicator.HandleReplicationTask(ctx, task, checkedNodes) case uncheckedCopies > 0: // If we have more copies than needed, but some of them are from the maintenance nodes, // save the local copy. p.log.Debug(ctx, logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance, zap.Int("count", uncheckedCopies)) case uncheckedCopies == 0: // Safe to remove: checked all copies, shortage == 0. requirements.removeLocalCopy = true } }