diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go deleted file mode 100644 index 194c5188..00000000 --- a/pkg/services/audit/auditor/context.go +++ /dev/null @@ -1,298 +0,0 @@ -package auditor - -import ( - "context" - "sync" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.uber.org/atomic" - "go.uber.org/zap" -) - -// Context represents container data audit execution context. -type Context struct { - ContextPrm - - task *audit.Task - - report *audit.Report - - sgMembersMtx sync.RWMutex - sgMembersCache map[oid.ID][]oid.ID - - placementMtx sync.Mutex - placementCache map[string][][]netmap.NodeInfo - - porRequests, porRetries atomic.Uint32 - - pairs []gamePair - - pairedMtx sync.Mutex - pairedNodes map[uint64]*pairMemberInfo - - counters struct { - hit, miss, fail uint32 - } - - cnrNodesNum int - - headMtx sync.RWMutex - headResponses map[string]shortHeader -} - -type pairMemberInfo struct { - failedPDP, passedPDP bool // at least one - - node netmap.NodeInfo -} - -type gamePair struct { - n1, n2 netmap.NodeInfo - - id oid.ID - - rn1, rn2 []*object.Range - - hh1, hh2 [][]byte -} - -type shortHeader struct { - tzhash []byte - - objectSize uint64 -} - -// ContextPrm groups components required to conduct data audit checks. -type ContextPrm struct { - maxPDPSleep uint64 - - log *logger.Logger - - cnrCom ContainerCommunicator - - pdpWorkerPool, porWorkerPool util.WorkerPool -} - -type commonCommunicatorPrm struct { - Node netmap.NodeInfo - - OID oid.ID - CID cid.ID -} - -// GetHeaderPrm groups parameter of GetHeader operation. -type GetHeaderPrm struct { - commonCommunicatorPrm - - NodeIsRelay bool -} - -// GetRangeHashPrm groups parameter of GetRangeHash operation. -type GetRangeHashPrm struct { - commonCommunicatorPrm - - Range *object.Range -} - -// ContainerCommunicator is an interface of -// component of communication with container nodes. -type ContainerCommunicator interface { - // GetHeader must return object header from the container node. - GetHeader(context.Context, GetHeaderPrm) (*object.Object, error) - - // GetRangeHash must return homomorphic Tillich-Zemor hash of payload range of the - // object stored in container node. - GetRangeHash(context.Context, GetRangeHashPrm) ([]byte, error) -} - -// NewContext creates, initializes and returns Context. -func NewContext(prm ContextPrm) *Context { - return &Context{ - ContextPrm: prm, - } -} - -// SetLogger sets logging component. -func (p *ContextPrm) SetLogger(l *logger.Logger) { - if p != nil { - p.log = l - } -} - -// SetContainerCommunicator sets component of communication with container nodes. -func (p *ContextPrm) SetContainerCommunicator(cnrCom ContainerCommunicator) { - if p != nil { - p.cnrCom = cnrCom - } -} - -// SetMaxPDPSleep sets maximum sleep interval between range hash requests. -// as part of PDP check. -func (p *ContextPrm) SetMaxPDPSleep(dur time.Duration) { - if p != nil { - p.maxPDPSleep = uint64(dur) - } -} - -// WithTask sets container audit parameters. -func (c *Context) WithTask(t *audit.Task) *Context { - if c != nil { - c.task = t - } - - return c -} - -// WithPDPWorkerPool sets worker pool for PDP pairs processing. -func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context { - if c != nil { - c.pdpWorkerPool = pool - } - - return c -} - -// WithPoRWorkerPool sets worker pool for PoR SG processing. -func (c *Context) WithPoRWorkerPool(pool util.WorkerPool) *Context { - if c != nil { - c.porWorkerPool = pool - } - - return c -} - -func (c *Context) containerID() cid.ID { - return c.task.ContainerID() -} - -func (c *Context) init() { - c.report = audit.NewReport(c.containerID()) - - c.sgMembersCache = make(map[oid.ID][]oid.ID) - - c.placementCache = make(map[string][][]netmap.NodeInfo) - - cnrVectors := c.task.ContainerNodes() - for i := range cnrVectors { - c.cnrNodesNum += len(cnrVectors[i]) - } - - c.pairedNodes = make(map[uint64]*pairMemberInfo) - - c.headResponses = make(map[string]shortHeader) - - c.log = &logger.Logger{Logger: c.log.With( - zap.Stringer("container ID", c.task.ContainerID()), - )} -} - -func (c *Context) expired(ctx context.Context) bool { - select { - case <-ctx.Done(): - c.log.Debug(logs.AuditorAuditContextIsDone, - zap.String("error", ctx.Err().Error()), - ) - - return true - default: - return false - } -} - -func (c *Context) complete() { - c.report.Complete() -} - -func (c *Context) writeReport() { - c.log.Debug(logs.AuditorWritingAuditReport) - - if err := c.task.Reporter().WriteReport(c.report); err != nil { - c.log.Error(logs.AuditorCouldNotWriteAuditReport) - } -} - -func (c *Context) buildPlacement(id oid.ID) ([][]netmap.NodeInfo, error) { - c.placementMtx.Lock() - defer c.placementMtx.Unlock() - - strID := id.EncodeToString() - - if nn, ok := c.placementCache[strID]; ok { - return nn, nil - } - - nn, err := placement.BuildObjectPlacement( - c.task.NetworkMap(), - c.task.ContainerNodes(), - &id, - ) - if err != nil { - return nil, err - } - - c.placementCache[strID] = nn - - return nn, nil -} - -func (c *Context) objectSize(id oid.ID) uint64 { - c.headMtx.RLock() - defer c.headMtx.RUnlock() - - strID := id.EncodeToString() - - if hdr, ok := c.headResponses[strID]; ok { - return hdr.objectSize - } - - return 0 -} - -func (c *Context) objectHomoHash(id oid.ID) []byte { - c.headMtx.RLock() - defer c.headMtx.RUnlock() - - strID := id.EncodeToString() - - if hdr, ok := c.headResponses[strID]; ok { - return hdr.tzhash - } - - return nil -} - -func (c *Context) updateHeadResponses(hdr *object.Object) { - id, ok := hdr.ID() - if !ok { - return - } - - strID := id.EncodeToString() - cs, _ := hdr.PayloadHomomorphicHash() - - c.headMtx.Lock() - defer c.headMtx.Unlock() - - if _, ok := c.headResponses[strID]; !ok { - c.headResponses[strID] = shortHeader{ - tzhash: cs.Value(), - objectSize: hdr.PayloadSize(), - } - } -} - -func (c *Context) updateSGInfo(id oid.ID, members []oid.ID) { - c.sgMembersMtx.Lock() - defer c.sgMembersMtx.Unlock() - - c.sgMembersCache[id] = members -} diff --git a/pkg/services/audit/auditor/exec.go b/pkg/services/audit/auditor/exec.go deleted file mode 100644 index e603818b..00000000 --- a/pkg/services/audit/auditor/exec.go +++ /dev/null @@ -1,37 +0,0 @@ -package auditor - -import ( - "context" - "fmt" -) - -// Execute audits container data. -func (c *Context) Execute(ctx context.Context, onCompleted func()) { - defer onCompleted() - c.init() - - checks := []struct { - name string - exec func(context.Context) - }{ - {name: "PoR", exec: c.executePoR}, - {name: "PoP", exec: c.executePoP}, - {name: "PDP", exec: c.executePDP}, - } - - for i := range checks { - c.log.Debug(fmt.Sprintf("executing %s check...", checks[i].name)) - - if c.expired(ctx) { - break - } - - checks[i].exec(ctx) - - if i == len(checks)-1 { - c.complete() - } - } - - c.writeReport() -} diff --git a/pkg/services/audit/auditor/pdp.go b/pkg/services/audit/auditor/pdp.go deleted file mode 100644 index d5ad0fea..00000000 --- a/pkg/services/audit/auditor/pdp.go +++ /dev/null @@ -1,240 +0,0 @@ -package auditor - -import ( - "bytes" - "context" - "sync" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "git.frostfs.info/TrueCloudLab/tzhash/tz" - "go.uber.org/zap" -) - -func (c *Context) executePDP(ctx context.Context) { - c.processPairs(ctx) - c.writePairsResult() -} - -func (c *Context) processPairs(ctx context.Context) { - wg := new(sync.WaitGroup) - - for i := range c.pairs { - p := &c.pairs[i] - wg.Add(1) - - if err := c.pdpWorkerPool.Submit(func() { - c.processPair(ctx, p) - wg.Done() - }); err != nil { - wg.Done() - } - } - - wg.Wait() - c.pdpWorkerPool.Release() -} - -func (c *Context) processPair(ctx context.Context, p *gamePair) { - c.distributeRanges(p) - c.collectHashes(ctx, p) - c.analyzeHashes(p) -} - -func (c *Context) distributeRanges(p *gamePair) { - p.rn1 = make([]*object.Range, hashRangeNumber-1) - p.rn2 = make([]*object.Range, hashRangeNumber-1) - - for i := 0; i < hashRangeNumber-1; i++ { - p.rn1[i] = object.NewRange() - p.rn2[i] = object.NewRange() - } - - notches := c.splitPayload(p.id) - - { // node 1 - // [0:n2] - p.rn1[0].SetLength(notches[1]) - - // [n2:n3] - p.rn1[1].SetOffset(notches[1]) - p.rn1[1].SetLength(notches[2] - notches[1]) - - // [n3:full] - p.rn1[2].SetOffset(notches[2]) - p.rn1[2].SetLength(notches[3] - notches[2]) - } - - { // node 2 - // [0:n1] - p.rn2[0].SetLength(notches[0]) - - // [n1:n2] - p.rn2[1].SetOffset(notches[0]) - p.rn2[1].SetLength(notches[1] - notches[0]) - - // [n2:full] - p.rn2[2].SetOffset(notches[1]) - p.rn2[2].SetLength(notches[3] - notches[1]) - } -} - -func (c *Context) splitPayload(id oid.ID) []uint64 { - var ( - prev uint64 - size = c.objectSize(id) - notches = make([]uint64, 0, hashRangeNumber) - ) - - for i := uint64(0); i < hashRangeNumber; i++ { - if i < hashRangeNumber-1 { - max := size - prev - (hashRangeNumber - i) - if max == 0 { - prev++ - } else { - prev += rand.Uint64()%max + 1 - } - } else { - prev = size - } - - notches = append(notches, prev) - } - - return notches -} - -func (c *Context) collectHashes(ctx context.Context, p *gamePair) { - fn := func(n netmap.NodeInfo, rngs []*object.Range) [][]byte { - // Here we randomize the order a bit: the hypothesis is that this - // makes it harder for an unscrupulous node to come up with a - // reliable cheating strategy. - order := make([]int, len(rngs)) - for i := range order { - order[i] = i - } - rand.Shuffle(len(order), func(i, j int) { order[i], order[j] = order[j], order[i] }) - - var getRangeHashPrm GetRangeHashPrm - getRangeHashPrm.CID = c.task.ContainerID() - getRangeHashPrm.OID = p.id - getRangeHashPrm.Node = n - - res := make([][]byte, len(rngs)) - for _, i := range order { - var sleepDur time.Duration - if c.maxPDPSleep > 0 { - sleepDur = time.Duration(rand.Uint64() % c.maxPDPSleep) - } - - c.log.Debug(logs.AuditorSleepBeforeGetRangeHash, - zap.Stringer("interval", sleepDur), - ) - - time.Sleep(sleepDur) - - getRangeHashPrm.Range = rngs[i] - - h, err := c.cnrCom.GetRangeHash(ctx, getRangeHashPrm) - if err != nil { - c.log.Debug(logs.AuditorCouldNotGetPayloadRangeHash, - zap.Stringer("id", p.id), - zap.String("node", netmap.StringifyPublicKey(n)), - zap.String("error", err.Error()), - ) - return res - } - res[i] = h - } - return res - } - - p.hh1 = fn(p.n1, p.rn1) - p.hh2 = fn(p.n2, p.rn2) -} - -func (c *Context) analyzeHashes(p *gamePair) { - if len(p.hh1) != hashRangeNumber-1 || len(p.hh2) != hashRangeNumber-1 { - c.failNodesPDP(p.n1, p.n2) - return - } - - h1, err := tz.Concat([][]byte{p.hh2[0], p.hh2[1]}) - if err != nil || !bytes.Equal(p.hh1[0], h1) { - c.failNodesPDP(p.n1, p.n2) - return - } - - h2, err := tz.Concat([][]byte{p.hh1[1], p.hh1[2]}) - if err != nil || !bytes.Equal(p.hh2[2], h2) { - c.failNodesPDP(p.n1, p.n2) - return - } - - fh, err := tz.Concat([][]byte{h1, h2}) - if err != nil || !bytes.Equal(fh, c.objectHomoHash(p.id)) { - c.failNodesPDP(p.n1, p.n2) - return - } - - c.passNodesPDP(p.n1, p.n2) -} - -func (c *Context) failNodesPDP(ns ...netmap.NodeInfo) { - c.pairedMtx.Lock() - - for i := range ns { - c.pairedNodes[ns[i].Hash()].failedPDP = true - } - - c.pairedMtx.Unlock() -} - -func (c *Context) passNodesPDP(ns ...netmap.NodeInfo) { - c.pairedMtx.Lock() - - for i := range ns { - c.pairedNodes[ns[i].Hash()].passedPDP = true - } - - c.pairedMtx.Unlock() -} - -func (c *Context) writePairsResult() { - var failCount, okCount int - - c.iteratePairedNodes( - func(netmap.NodeInfo) { failCount++ }, - func(netmap.NodeInfo) { okCount++ }, - ) - - failedNodes := make([][]byte, 0, failCount) - passedNodes := make([][]byte, 0, okCount) - - c.iteratePairedNodes( - func(n netmap.NodeInfo) { - failedNodes = append(failedNodes, n.PublicKey()) - }, - func(n netmap.NodeInfo) { - passedNodes = append(passedNodes, n.PublicKey()) - }, - ) - - c.report.SetPDPResults(passedNodes, failedNodes) -} - -func (c *Context) iteratePairedNodes(onFail, onPass func(netmap.NodeInfo)) { - for _, pairedNode := range c.pairedNodes { - if pairedNode.failedPDP { - onFail(pairedNode.node) - } - - if pairedNode.passedPDP { - onPass(pairedNode.node) - } - } -} diff --git a/pkg/services/audit/auditor/pop.go b/pkg/services/audit/auditor/pop.go deleted file mode 100644 index b64004bb..00000000 --- a/pkg/services/audit/auditor/pop.go +++ /dev/null @@ -1,187 +0,0 @@ -package auditor - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "git.frostfs.info/TrueCloudLab/tzhash/tz" - "go.uber.org/zap" -) - -const ( - hashRangeNumber = 4 - minGamePayloadSize = hashRangeNumber * tz.Size -) - -func (c *Context) executePoP(ctx context.Context) { - c.buildCoverage(ctx) - - c.report.SetPlacementCounters( - c.counters.hit, - c.counters.miss, - c.counters.fail, - ) -} - -func (c *Context) buildCoverage(ctx context.Context) { - policy := c.task.ContainerStructure().PlacementPolicy() - - // select random member from another storage group - // and process all placement vectors - c.iterateSGMembersPlacementRand(func(id oid.ID, ind int, nodes []netmap.NodeInfo) bool { - c.processObjectPlacement(ctx, id, nodes, policy.ReplicaNumberByIndex(ind)) - return c.containerCovered() - }) -} - -func (c *Context) containerCovered() bool { - // number of container nodes can be calculated once - return c.cnrNodesNum <= len(c.pairedNodes) -} - -func (c *Context) processObjectPlacement(ctx context.Context, id oid.ID, nodes []netmap.NodeInfo, replicas uint32) { - var ( - ok uint32 - optimal bool - - unpairedCandidate1, unpairedCandidate2 = -1, -1 - - pairedCandidate = -1 - ) - - var getHeaderPrm GetHeaderPrm - getHeaderPrm.OID = id - getHeaderPrm.CID = c.task.ContainerID() - getHeaderPrm.NodeIsRelay = false - - for i := 0; ok < replicas && i < len(nodes); i++ { - getHeaderPrm.Node = nodes[i] - - // try to get object header from node - hdr, err := c.cnrCom.GetHeader(ctx, getHeaderPrm) - if err != nil { - c.log.Debug(logs.AuditorCouldNotGetObjectHeaderFromCandidate, - zap.Stringer("id", id), - zap.String("error", err.Error()), - ) - - continue - } - - c.updateHeadResponses(hdr) - - // increment success counter - ok++ - - // update optimal flag - optimal = ok == replicas && uint32(i) < replicas - - // exclude small objects from coverage - if c.objectSize(id) < minGamePayloadSize { - continue - } - - // update potential candidates to be paired - if _, ok := c.pairedNodes[nodes[i].Hash()]; !ok { - if unpairedCandidate1 < 0 { - unpairedCandidate1 = i - } else if unpairedCandidate2 < 0 { - unpairedCandidate2 = i - } - } else if pairedCandidate < 0 { - pairedCandidate = i - } - } - - if optimal { - c.counters.hit++ - } else if ok == replicas { - c.counters.miss++ - } else { - c.counters.fail++ - } - - if unpairedCandidate1 >= 0 { - if unpairedCandidate2 >= 0 { - c.composePair(id, nodes[unpairedCandidate1], nodes[unpairedCandidate2]) - } else if pairedCandidate >= 0 { - c.composePair(id, nodes[unpairedCandidate1], nodes[pairedCandidate]) - } - } -} - -func (c *Context) composePair(id oid.ID, n1, n2 netmap.NodeInfo) { - c.pairs = append(c.pairs, gamePair{ - n1: n1, - n2: n2, - id: id, - }) - - c.pairedNodes[n1.Hash()] = &pairMemberInfo{ - node: n1, - } - c.pairedNodes[n2.Hash()] = &pairMemberInfo{ - node: n2, - } -} - -func (c *Context) iterateSGMembersPlacementRand(f func(oid.ID, int, []netmap.NodeInfo) bool) { - // iterate over storage groups members for all storage groups (one by one) - // with randomly shuffled members - c.iterateSGMembersRand(func(id oid.ID) bool { - // build placement vector for the current object - nn, err := c.buildPlacement(id) - if err != nil { - c.log.Debug(logs.AuditorCouldNotBuildPlacementForObject, - zap.Stringer("id", id), - zap.String("error", err.Error()), - ) - - return false - } - - for i, nodes := range nn { - if f(id, i, nodes) { - return true - } - } - - return false - }) -} - -func (c *Context) iterateSGMembersRand(f func(oid.ID) bool) { - c.iterateSGInfo(func(members []oid.ID) bool { - ln := len(members) - - processed := make(map[uint64]struct{}, ln-1) - - for len(processed) < ln { - ind := nextRandUint64(uint64(ln), processed) - processed[ind] = struct{}{} - - if f(members[ind]) { - return true - } - } - - return false - }) -} - -func (c *Context) iterateSGInfo(f func([]oid.ID) bool) { - c.sgMembersMtx.RLock() - defer c.sgMembersMtx.RUnlock() - - // we can add randomization like for SG members, - // but list of storage groups is already expected - // to be shuffled since it is a Search response - // with unpredictable order - for i := range c.sgMembersCache { - if f(c.sgMembersCache[i]) { - return - } - } -} diff --git a/pkg/services/audit/auditor/por.go b/pkg/services/audit/auditor/por.go deleted file mode 100644 index d579b3a7..00000000 --- a/pkg/services/audit/auditor/por.go +++ /dev/null @@ -1,156 +0,0 @@ -package auditor - -import ( - "bytes" - "context" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand" - containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - storagegroupSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/storagegroup" - "git.frostfs.info/TrueCloudLab/tzhash/tz" - "go.uber.org/zap" -) - -func (c *Context) executePoR(ctx context.Context) { - wg := new(sync.WaitGroup) - sgs := c.task.StorageGroupList() - - for _, sg := range sgs { - wg.Add(1) - - if err := c.porWorkerPool.Submit(func() { - c.checkStorageGroupPoR(ctx, sg.ID(), sg.StorageGroup()) - wg.Done() - }); err != nil { - wg.Done() - } - } - - wg.Wait() - c.porWorkerPool.Release() - - c.report.SetPoRCounters(c.porRequests.Load(), c.porRetries.Load()) -} - -func (c *Context) checkStorageGroupPoR(ctx context.Context, sgID oid.ID, sg storagegroupSDK.StorageGroup) { - members := sg.Members() - c.updateSGInfo(sgID, members) - - var ( - tzHash []byte - totalSize uint64 - - accRequests, accRetries uint32 - ) - - var getHeaderPrm GetHeaderPrm - getHeaderPrm.CID = c.task.ContainerID() - getHeaderPrm.NodeIsRelay = true - - homomorphicHashingEnabled := !containerSDK.IsHomomorphicHashingDisabled(c.task.ContainerStructure()) - - for i := range members { - flat, ok := c.getShuffledNodes(members[i], sgID) - if !ok { - continue - } - - getHeaderPrm.OID = members[i] - - for j := range flat { - accRequests++ - if j > 0 { // in best case audit get object header on first iteration - accRetries++ - } - - getHeaderPrm.Node = flat[j] - - hdr, err := c.cnrCom.GetHeader(ctx, getHeaderPrm) - if err != nil { - c.log.Debug(logs.AuditorCantHeadObject, - zap.String("remote_node", netmap.StringifyPublicKey(flat[j])), - zap.Stringer("oid", members[i]), - ) - - continue - } - - // update cache for PoR and PDP audit checks - c.updateHeadResponses(hdr) - - if homomorphicHashingEnabled { - cs, _ := hdr.PayloadHomomorphicHash() - if len(tzHash) == 0 { - tzHash = cs.Value() - } else { - tzHash, err = tz.Concat([][]byte{ - tzHash, - cs.Value(), - }) - if err != nil { - c.log.Debug(logs.AuditorCantConcatenateTzHash, - zap.String("oid", members[i].String()), - zap.String("error", err.Error())) - - break - } - } - } - - totalSize += hdr.PayloadSize() - - break - } - } - - c.porRequests.Add(accRequests) - c.porRetries.Add(accRetries) - - sizeCheck := sg.ValidationDataSize() == totalSize - cs, _ := sg.ValidationDataHash() - tzCheck := !homomorphicHashingEnabled || bytes.Equal(tzHash, cs.Value()) - - c.writeCheckReport(sizeCheck, tzCheck, sgID, sg, totalSize) -} - -func (c *Context) writeCheckReport(sizeCheck, tzCheck bool, sgID oid.ID, sg storagegroupSDK.StorageGroup, totalSize uint64) { - if sizeCheck && tzCheck { - c.report.PassedPoR(sgID) - } else { - if !sizeCheck { - c.log.Debug(logs.AuditorStorageGroupSizeCheckFailed, - zap.Uint64("expected", sg.ValidationDataSize()), - zap.Uint64("got", totalSize)) - } - - if !tzCheck { - c.log.Debug(logs.AuditorStorageGroupTzHashCheckFailed) - } - - c.report.FailedPoR(sgID) - } -} - -func (c *Context) getShuffledNodes(member oid.ID, sgID oid.ID) ([]netmap.NodeInfo, bool) { - objectPlacement, err := c.buildPlacement(member) - if err != nil { - c.log.Info(logs.AuditorCantBuildPlacementForStorageGroupMember, - zap.Stringer("sg", sgID), - zap.String("member_id", member.String()), - ) - - return nil, false - } - - flat := placement.FlattenNodes(objectPlacement) - - rand.Shuffle(len(flat), func(i, j int) { - flat[i], flat[j] = flat[j], flat[i] - }) - return flat, true -} diff --git a/pkg/services/audit/auditor/util.go b/pkg/services/audit/auditor/util.go deleted file mode 100644 index 5f868553..00000000 --- a/pkg/services/audit/auditor/util.go +++ /dev/null @@ -1,18 +0,0 @@ -package auditor - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand" -) - -// nextRandUint64 returns random uint64 number [0; n) outside exclude map. -// Panics if len(exclude) >= n. -func nextRandUint64(n uint64, exclude map[uint64]struct{}) uint64 { - ln := uint64(len(exclude)) - ind := rand.Uint64() % (n - ln) - - for i := ind; ; i++ { - if _, ok := exclude[i]; !ok { - return i - } - } -} diff --git a/pkg/services/audit/report.go b/pkg/services/audit/report.go deleted file mode 100644 index f16f9738..00000000 --- a/pkg/services/audit/report.go +++ /dev/null @@ -1,89 +0,0 @@ -package audit - -import ( - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/audit" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" -) - -// Report tracks the progress of auditing container data. -type Report struct { - mu sync.RWMutex - res audit.Result -} - -// Reporter is an interface of the entity that records -// the data audit report. -type Reporter interface { - WriteReport(r *Report) error -} - -// NewReport creates and returns blank Report instance. -func NewReport(cnr cid.ID) *Report { - var rep Report - rep.res.ForContainer(cnr) - - return &rep -} - -// Result forms the structure of the data audit result. -func (r *Report) Result() *audit.Result { - r.mu.RLock() - defer r.mu.RUnlock() - - return &r.res -} - -// Complete completes audit report. -func (r *Report) Complete() { - r.mu.Lock() - defer r.mu.Unlock() - - r.res.Complete() -} - -// PassedPoR updates list of passed storage groups. -func (r *Report) PassedPoR(sg oid.ID) { - r.mu.Lock() - defer r.mu.Unlock() - - r.res.SubmitPassedStorageGroup(sg) -} - -// FailedPoR updates list of failed storage groups. -func (r *Report) FailedPoR(sg oid.ID) { - r.mu.Lock() - defer r.mu.Unlock() - - r.res.SubmitFailedStorageGroup(sg) -} - -// SetPlacementCounters sets counters of compliance with placement. -func (r *Report) SetPlacementCounters(hit, miss, fail uint32) { - r.mu.Lock() - defer r.mu.Unlock() - - r.res.SetHits(hit) - r.res.SetMisses(miss) - r.res.SetFailures(fail) -} - -// SetPDPResults sets lists of nodes according to their PDP results. -func (r *Report) SetPDPResults(passed, failed [][]byte) { - r.mu.Lock() - defer r.mu.Unlock() - - r.res.SubmitPassedStorageNodes(passed) - r.res.SubmitFailedStorageNodes(failed) -} - -// SetPoRCounters sets amounts of head requests and retries at PoR audit stage. -func (r *Report) SetPoRCounters(requests, retries uint32) { - r.mu.Lock() - defer r.mu.Unlock() - - r.res.SetRequestsPoR(requests) - r.res.SetRetriesPoR(retries) -} diff --git a/pkg/services/audit/task.go b/pkg/services/audit/task.go deleted file mode 100644 index 3de5ac2c..00000000 --- a/pkg/services/audit/task.go +++ /dev/null @@ -1,120 +0,0 @@ -package audit - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" -) - -// Task groups groups the container audit parameters. -type Task struct { - cancelCh <-chan struct{} - - reporter Reporter - - idCnr cid.ID - - cnr container.Container - - nm *netmap.NetMap - - cnrNodes [][]netmap.NodeInfo - - sgList []storagegroup.StorageGroup -} - -// WithReporter sets audit report writer. -func (t *Task) WithReporter(r Reporter) *Task { - if t != nil { - t.reporter = r - } - - return t -} - -// Reporter returns audit report writer. -func (t *Task) Reporter() Reporter { - return t.reporter -} - -func (t *Task) WithCancelChannel(ch <-chan struct{}) *Task { - if ch != nil { - t.cancelCh = ch - } - return t -} - -func (t *Task) CancelChannel() <-chan struct{} { - return t.cancelCh -} - -// WithContainerID sets identifier of the container under audit. -func (t *Task) WithContainerID(cnr cid.ID) *Task { - if t != nil { - t.idCnr = cnr - } - - return t -} - -// ContainerID returns identifier of the container under audit. -func (t *Task) ContainerID() cid.ID { - return t.idCnr -} - -// WithContainerStructure sets structure of the container under audit. -func (t *Task) WithContainerStructure(cnr container.Container) *Task { - if t != nil { - t.cnr = cnr - } - - return t -} - -// ContainerStructure returns structure of the container under audit. -func (t *Task) ContainerStructure() container.Container { - return t.cnr -} - -// WithContainerNodes sets nodes in the container under audit. -func (t *Task) WithContainerNodes(cnrNodes [][]netmap.NodeInfo) *Task { - if t != nil { - t.cnrNodes = cnrNodes - } - - return t -} - -// NetworkMap returns network map of audit epoch. -func (t *Task) NetworkMap() *netmap.NetMap { - return t.nm -} - -// WithNetworkMap sets network map of audit epoch. -func (t *Task) WithNetworkMap(nm *netmap.NetMap) *Task { - if t != nil { - t.nm = nm - } - - return t -} - -// ContainerNodes returns nodes in the container under audit. -func (t *Task) ContainerNodes() [][]netmap.NodeInfo { - return t.cnrNodes -} - -// WithStorageGroupList sets a list of storage groups from container under audit. -func (t *Task) WithStorageGroupList(sgList []storagegroup.StorageGroup) *Task { - if t != nil { - t.sgList = sgList - } - - return t -} - -// StorageGroupList returns list of storage groups from container under audit. -func (t *Task) StorageGroupList() []storagegroup.StorageGroup { - return t.sgList -} diff --git a/pkg/services/audit/taskmanager/listen.go b/pkg/services/audit/taskmanager/listen.go deleted file mode 100644 index bfc37c2a..00000000 --- a/pkg/services/audit/taskmanager/listen.go +++ /dev/null @@ -1,85 +0,0 @@ -package audittask - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/auditor" - "go.uber.org/zap" -) - -// Listen starts the process of processing tasks from the queue. -// -// The listener is terminated by context. -func (m *Manager) Listen(ctx context.Context) { - m.log.Info(logs.TaskmanagerProcessRoutine, - zap.Uint32("queue_capacity", m.queueCap), - ) - - m.ch = make(chan *audit.Task, m.queueCap) - - for { - select { - case <-ctx.Done(): - m.log.Warn(logs.TaskmanagerStopListenerByContext, - zap.String("error", ctx.Err().Error()), - ) - m.workerPool.Release() - - return - case task, ok := <-m.ch: - if !ok { - m.log.Warn(logs.TaskmanagerQueueChannelIsClosed) - return - } - - tCtx, tCancel := context.WithCancel(ctx) // cancel task in case of listen cancel - go func() { - select { - case <-tCtx.Done(): // listen cancelled or task completed - return - case <-task.CancelChannel(): // new epoch - tCancel() - } - }() - - m.handleTask(tCtx, task, tCancel) - } - } -} - -func (m *Manager) handleTask(ctx context.Context, task *audit.Task, onCompleted func()) { - pdpPool, err := m.pdpPoolGenerator() - if err != nil { - m.log.Error(logs.TaskmanagerCouldNotGeneratePDPWorkerPool, - zap.String("error", err.Error()), - ) - onCompleted() - return - } - - porPool, err := m.pdpPoolGenerator() - if err != nil { - m.log.Error(logs.TaskmanagerCouldNotGeneratePoRWorkerPool, - zap.String("error", err.Error()), - ) - onCompleted() - return - } - - auditContext := m.generateContext(task). - WithPDPWorkerPool(pdpPool). - WithPoRWorkerPool(porPool) - - if err := m.workerPool.Submit(func() { auditContext.Execute(ctx, onCompleted) }); err != nil { - // may be we should report it - m.log.Warn(logs.TaskmanagerCouldNotSubmitAuditTask) - onCompleted() - } -} - -func (m *Manager) generateContext(task *audit.Task) *auditor.Context { - return auditor.NewContext(m.ctxPrm). - WithTask(task) -} diff --git a/pkg/services/audit/taskmanager/manager.go b/pkg/services/audit/taskmanager/manager.go deleted file mode 100644 index bf769879..00000000 --- a/pkg/services/audit/taskmanager/manager.go +++ /dev/null @@ -1,107 +0,0 @@ -package audittask - -import ( - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/auditor" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "go.uber.org/zap" -) - -// Manager represents an entity performing data audit tasks. -type Manager struct { - *cfg - - ch chan *audit.Task -} - -// Option is a Manager's constructor option. -type Option func(*cfg) - -type cfg struct { - queueCap uint32 - - log *logger.Logger - - ctxPrm auditor.ContextPrm - - workerPool util.WorkerPool - - pdpPoolGenerator, porPoolGenerator func() (util.WorkerPool, error) -} - -func defaultCfg() *cfg { - return &cfg{ - log: &logger.Logger{Logger: zap.L()}, - } -} - -// New creates, initializes and returns new Manager instance. -func New(opts ...Option) *Manager { - c := defaultCfg() - - for i := range opts { - opts[i](c) - } - - return &Manager{ - cfg: c, - } -} - -// WithLogger returns option to specify Manager's logger. -func WithLogger(l *logger.Logger) Option { - return func(c *cfg) { - c.log = &logger.Logger{Logger: l.With(zap.String("component", "Audit task manager"))} - c.ctxPrm.SetLogger(l) - } -} - -// WithWorkerPool returns option to set worker pool -// for task execution. -func WithWorkerPool(p util.WorkerPool) Option { - return func(c *cfg) { - c.workerPool = p - } -} - -// WithQueueCapacity returns option to set task queue capacity. -func WithQueueCapacity(capacity uint32) Option { - return func(c *cfg) { - c.queueCap = capacity - } -} - -// WithContainerCommunicator returns option to set component of communication -// with container nodes. -func WithContainerCommunicator(cnrCom auditor.ContainerCommunicator) Option { - return func(c *cfg) { - c.ctxPrm.SetContainerCommunicator(cnrCom) - } -} - -// WithMaxPDPSleepInterval returns option to set maximum sleep interval -// between range hash requests as part of PDP check. -func WithMaxPDPSleepInterval(dur time.Duration) Option { - return func(c *cfg) { - c.ctxPrm.SetMaxPDPSleep(dur) - } -} - -// WithPDPWorkerPoolGenerator returns option to set worker pool for PDP pairs processing. -// Callback caller owns returned pool and must release it appropriately. -func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { - return func(c *cfg) { - c.pdpPoolGenerator = f - } -} - -// WithPoRWorkerPoolGenerator returns option to set worker pool for PoR SG processing. -// Callback caller owns returned pool and must release it appropriately. -func WithPoRWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option { - return func(c *cfg) { - c.porPoolGenerator = f - } -} diff --git a/pkg/services/audit/taskmanager/push.go b/pkg/services/audit/taskmanager/push.go deleted file mode 100644 index 805897db..00000000 --- a/pkg/services/audit/taskmanager/push.go +++ /dev/null @@ -1,10 +0,0 @@ -package audittask - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit" -) - -// PushTask adds a task to the queue for processing. -func (m *Manager) PushTask(t *audit.Task) { - m.ch <- t -} diff --git a/pkg/services/audit/taskmanager/reset.go b/pkg/services/audit/taskmanager/reset.go deleted file mode 100644 index 86f2538c..00000000 --- a/pkg/services/audit/taskmanager/reset.go +++ /dev/null @@ -1,11 +0,0 @@ -package audittask - -// Reset pops all tasks from the queue. -// Returns amount of popped elements. -func (m *Manager) Reset() (popped int) { - for ; len(m.ch) > 0; popped++ { - <-m.ch - } - - return -}