package policer import ( "context" "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" policycore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) func (p *Policer) processObject(ctx context.Context, objInfo objectcore.Info) error { cnr, err := p.cnrSrc.Get(objInfo.Address.Container()) if err != nil { if client.IsErrContainerNotFound(err) { existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, objInfo.Address.Container()) if errWasRemoved != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved) } else if existed { err := p.buryFn(ctx, objInfo.Address) if err != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err) } } } return fmt.Errorf("%s: %w", logs.PolicerCouldNotGetContainer, err) } policy := cnr.Value.PlacementPolicy() if policycore.IsECPlacement(policy) { return p.processECContainerObject(ctx, objInfo, policy) } return p.processRepContainerObject(ctx, objInfo, policy) } func (p *Policer) processRepContainerObject(ctx context.Context, objInfo objectcore.Info, policy netmap.PlacementPolicy) error { idObj := objInfo.Address.Object() idCnr := objInfo.Address.Container() nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy) if err != nil { return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) } c := &placementRequirements{} // cached info about already checked nodes checkedNodes := newNodeCache() for i := range nn { select { case <-ctx.Done(): return ctx.Err() default: } p.processRepNodes(ctx, c, objInfo, nn[i], policy.ReplicaDescriptor(i).NumberOfObjects(), checkedNodes) } if !c.needLocalCopy && c.removeLocalCopy { p.log.Info(logs.PolicerRedundantLocalObjectCopyDetected, zap.Stringer("object", objInfo.Address), ) p.cbRedundantCopy(ctx, objInfo.Address) } return nil } type placementRequirements struct { // needLocalCopy is true if the current node must store an object according to the storage policy. needLocalCopy bool // removeLocalCopy is true if all copies are stored according to the storage policy // and the current node doesn't need to store an object. removeLocalCopy bool } func (p *Policer) processRepNodes(ctx context.Context, requirements *placementRequirements, objInfo objectcore.Info, nodes []netmap.NodeInfo, shortage uint32, checkedNodes nodeCache, ) { addr := objInfo.Address typ := objInfo.Type // Number of copies that are stored on maintenance nodes. var uncheckedCopies int if typ == objectSDK.TypeLock || typ == objectSDK.TypeTombstone || objInfo.IsLinkingObject { // all nodes of a container must store the `LOCK`, `TOMBSTONE` and linking objects // for correct object removal protection: // - `LOCK`, `TOMBSTONE` and linking objects are broadcast on their PUT requests; // - `LOCK` object removal is a prohibited action in the GC. shortage = uint32(len(nodes)) } for i := 0; shortage > 0 && i < len(nodes); i++ { select { case <-ctx.Done(): return default: } if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) { requirements.needLocalCopy = true shortage-- } else if nodes[i].IsMaintenance() { shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies) } else { if status := checkedNodes.processStatus(nodes[i]); status.Processed() { if status == nodeHoldsObject { // node already contains replica, no need to replicate nodes = append(nodes[:i], nodes[i+1:]...) i-- shortage-- } continue } callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) _, err := p.remoteHeader(callCtx, nodes[i], addr, false) cancel() if err == nil { shortage-- checkedNodes.submitReplicaHolder(nodes[i]) } else { if client.IsErrObjectNotFound(err) { checkedNodes.submitReplicaCandidate(nodes[i]) continue } else if isClientErrMaintenance(err) { shortage, uncheckedCopies = p.handleMaintenance(nodes[i], checkedNodes, shortage, uncheckedCopies) } else { p.log.Error(logs.PolicerReceiveObjectHeaderToCheckPolicyCompliance, zap.Stringer("object", addr), zap.String("error", err.Error()), ) } } } nodes = append(nodes[:i], nodes[i+1:]...) i-- } p.handleProcessNodesResult(ctx, addr, requirements, nodes, checkedNodes, shortage, uncheckedCopies) } // handleMaintenance handles node in maintenance mode and returns new shortage and uncheckedCopies values // // consider remote nodes under maintenance as problem OK. Such // nodes MAY not respond with object, however, this is how we // prevent spam with new replicas. // However, additional copies should not be removed in this case, // because we can remove the only copy this way. func (p *Policer) handleMaintenance(node netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int) (uint32, int) { checkedNodes.submitReplicaHolder(node) shortage-- uncheckedCopies++ p.log.Debug(logs.PolicerConsiderNodeUnderMaintenanceAsOK, zap.String("node", netmap.StringifyPublicKey(node)), ) return shortage, uncheckedCopies } func (p *Policer) handleProcessNodesResult(ctx context.Context, addr oid.Address, requirements *placementRequirements, nodes []netmap.NodeInfo, checkedNodes nodeCache, shortage uint32, uncheckedCopies int, ) { switch { case shortage > 0: p.log.Debug(logs.PolicerShortageOfObjectCopiesDetected, zap.Stringer("object", addr), zap.Uint32("shortage", shortage), ) task := replicator.Task{ NumCopies: shortage, Addr: addr, Nodes: nodes, } p.replicator.HandleReplicationTask(ctx, task, checkedNodes) case uncheckedCopies > 0: // If we have more copies than needed, but some of them are from the maintenance nodes, // save the local copy. p.log.Debug(logs.PolicerSomeOfTheCopiesAreStoredOnNodesUnderMaintenance, zap.Int("count", uncheckedCopies)) case uncheckedCopies == 0: // Safe to remove: checked all copies, shortage == 0. requirements.removeLocalCopy = true } } // isClientErrMaintenance checks if err corresponds to FrostFS status return // which tells that node is currently under maintenance. Supports wrapped // errors. // // Similar to client.IsErr___ errors, consider replacing to FrostFS SDK. func isClientErrMaintenance(err error) bool { switch unwrapErr(err).(type) { default: return false case *apistatus.NodeUnderMaintenance: return true } } // unwrapErr unwraps error using errors.Unwrap. func unwrapErr(err error) error { for e := errors.Unwrap(err); e != nil; e = errors.Unwrap(err) { err = e } return err }