fix/355-increase-tree-service-client-cache-size #359

Closed
ale64bit wants to merge 156 commits from ale64bit/frostfs-node:fix/355-increase-tree-service-client-cache-size into support/v0.36
12 changed files with 0 additions and 1358 deletions
Showing only changes of commit 7f49f07255 - Show all commits

View file

@ -1,298 +0,0 @@
package auditor
import (
"context"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/atomic"
"go.uber.org/zap"
)
// Context represents container data audit execution context.
type Context struct {
ContextPrm
task *audit.Task
report *audit.Report
sgMembersMtx sync.RWMutex
sgMembersCache map[oid.ID][]oid.ID
placementMtx sync.Mutex
placementCache map[string][][]netmap.NodeInfo
porRequests, porRetries atomic.Uint32
pairs []gamePair
pairedMtx sync.Mutex
pairedNodes map[uint64]*pairMemberInfo
counters struct {
hit, miss, fail uint32
}
cnrNodesNum int
headMtx sync.RWMutex
headResponses map[string]shortHeader
}
type pairMemberInfo struct {
failedPDP, passedPDP bool // at least one
node netmap.NodeInfo
}
type gamePair struct {
n1, n2 netmap.NodeInfo
id oid.ID
rn1, rn2 []*object.Range
hh1, hh2 [][]byte
}
type shortHeader struct {
tzhash []byte
objectSize uint64
}
// ContextPrm groups components required to conduct data audit checks.
type ContextPrm struct {
maxPDPSleep uint64
log *logger.Logger
cnrCom ContainerCommunicator
pdpWorkerPool, porWorkerPool util.WorkerPool
}
type commonCommunicatorPrm struct {
Node netmap.NodeInfo
OID oid.ID
CID cid.ID
}
// GetHeaderPrm groups parameter of GetHeader operation.
type GetHeaderPrm struct {
commonCommunicatorPrm
NodeIsRelay bool
}
// GetRangeHashPrm groups parameter of GetRangeHash operation.
type GetRangeHashPrm struct {
commonCommunicatorPrm
Range *object.Range
}
// ContainerCommunicator is an interface of
// component of communication with container nodes.
type ContainerCommunicator interface {
// GetHeader must return object header from the container node.
GetHeader(context.Context, GetHeaderPrm) (*object.Object, error)
// GetRangeHash must return homomorphic Tillich-Zemor hash of payload range of the
// object stored in container node.
GetRangeHash(context.Context, GetRangeHashPrm) ([]byte, error)
}
// NewContext creates, initializes and returns Context.
func NewContext(prm ContextPrm) *Context {
return &Context{
ContextPrm: prm,
}
}
// SetLogger sets logging component.
func (p *ContextPrm) SetLogger(l *logger.Logger) {
if p != nil {
p.log = l
}
}
// SetContainerCommunicator sets component of communication with container nodes.
func (p *ContextPrm) SetContainerCommunicator(cnrCom ContainerCommunicator) {
if p != nil {
p.cnrCom = cnrCom
}
}
// SetMaxPDPSleep sets maximum sleep interval between range hash requests.
// as part of PDP check.
func (p *ContextPrm) SetMaxPDPSleep(dur time.Duration) {
if p != nil {
p.maxPDPSleep = uint64(dur)
}
}
// WithTask sets container audit parameters.
func (c *Context) WithTask(t *audit.Task) *Context {
if c != nil {
c.task = t
}
return c
}
// WithPDPWorkerPool sets worker pool for PDP pairs processing.
func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context {
if c != nil {
c.pdpWorkerPool = pool
}
return c
}
// WithPoRWorkerPool sets worker pool for PoR SG processing.
func (c *Context) WithPoRWorkerPool(pool util.WorkerPool) *Context {
if c != nil {
c.porWorkerPool = pool
}
return c
}
func (c *Context) containerID() cid.ID {
return c.task.ContainerID()
}
func (c *Context) init() {
c.report = audit.NewReport(c.containerID())
c.sgMembersCache = make(map[oid.ID][]oid.ID)
c.placementCache = make(map[string][][]netmap.NodeInfo)
cnrVectors := c.task.ContainerNodes()
for i := range cnrVectors {
c.cnrNodesNum += len(cnrVectors[i])
}
c.pairedNodes = make(map[uint64]*pairMemberInfo)
c.headResponses = make(map[string]shortHeader)
c.log = &logger.Logger{Logger: c.log.With(
zap.Stringer("container ID", c.task.ContainerID()),
)}
}
func (c *Context) expired(ctx context.Context) bool {
select {
case <-ctx.Done():
c.log.Debug(logs.AuditorAuditContextIsDone,
zap.String("error", ctx.Err().Error()),
)
return true
default:
return false
}
}
func (c *Context) complete() {
c.report.Complete()
}
func (c *Context) writeReport() {
c.log.Debug(logs.AuditorWritingAuditReport)
if err := c.task.Reporter().WriteReport(c.report); err != nil {
c.log.Error(logs.AuditorCouldNotWriteAuditReport)
}
}
func (c *Context) buildPlacement(id oid.ID) ([][]netmap.NodeInfo, error) {
c.placementMtx.Lock()
defer c.placementMtx.Unlock()
strID := id.EncodeToString()
if nn, ok := c.placementCache[strID]; ok {
return nn, nil
}
nn, err := placement.BuildObjectPlacement(
c.task.NetworkMap(),
c.task.ContainerNodes(),
&id,
)
if err != nil {
return nil, err
}
c.placementCache[strID] = nn
return nn, nil
}
func (c *Context) objectSize(id oid.ID) uint64 {
c.headMtx.RLock()
defer c.headMtx.RUnlock()
strID := id.EncodeToString()
if hdr, ok := c.headResponses[strID]; ok {
return hdr.objectSize
}
return 0
}
func (c *Context) objectHomoHash(id oid.ID) []byte {
c.headMtx.RLock()
defer c.headMtx.RUnlock()
strID := id.EncodeToString()
if hdr, ok := c.headResponses[strID]; ok {
return hdr.tzhash
}
return nil
}
func (c *Context) updateHeadResponses(hdr *object.Object) {
id, ok := hdr.ID()
if !ok {
return
}
strID := id.EncodeToString()
cs, _ := hdr.PayloadHomomorphicHash()
c.headMtx.Lock()
defer c.headMtx.Unlock()
if _, ok := c.headResponses[strID]; !ok {
c.headResponses[strID] = shortHeader{
tzhash: cs.Value(),
objectSize: hdr.PayloadSize(),
}
}
}
func (c *Context) updateSGInfo(id oid.ID, members []oid.ID) {
c.sgMembersMtx.Lock()
defer c.sgMembersMtx.Unlock()
c.sgMembersCache[id] = members
}

View file

@ -1,37 +0,0 @@
package auditor
import (
"context"
"fmt"
)
// Execute audits container data.
func (c *Context) Execute(ctx context.Context, onCompleted func()) {
defer onCompleted()
c.init()
checks := []struct {
name string
exec func(context.Context)
}{
{name: "PoR", exec: c.executePoR},
{name: "PoP", exec: c.executePoP},
{name: "PDP", exec: c.executePDP},
}
for i := range checks {
c.log.Debug(fmt.Sprintf("executing %s check...", checks[i].name))
if c.expired(ctx) {
break
}
checks[i].exec(ctx)
if i == len(checks)-1 {
c.complete()
}
}
c.writeReport()
}

View file

@ -1,240 +0,0 @@
package auditor
import (
"bytes"
"context"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"go.uber.org/zap"
)
func (c *Context) executePDP(ctx context.Context) {
c.processPairs(ctx)
c.writePairsResult()
}
func (c *Context) processPairs(ctx context.Context) {
wg := new(sync.WaitGroup)
for i := range c.pairs {
p := &c.pairs[i]
wg.Add(1)
if err := c.pdpWorkerPool.Submit(func() {
c.processPair(ctx, p)
wg.Done()
}); err != nil {
wg.Done()
}
}
wg.Wait()
c.pdpWorkerPool.Release()
}
func (c *Context) processPair(ctx context.Context, p *gamePair) {
c.distributeRanges(p)
c.collectHashes(ctx, p)
c.analyzeHashes(p)
}
func (c *Context) distributeRanges(p *gamePair) {
p.rn1 = make([]*object.Range, hashRangeNumber-1)
p.rn2 = make([]*object.Range, hashRangeNumber-1)
for i := 0; i < hashRangeNumber-1; i++ {
p.rn1[i] = object.NewRange()
p.rn2[i] = object.NewRange()
}
notches := c.splitPayload(p.id)
{ // node 1
// [0:n2]
p.rn1[0].SetLength(notches[1])
// [n2:n3]
p.rn1[1].SetOffset(notches[1])
p.rn1[1].SetLength(notches[2] - notches[1])
// [n3:full]
p.rn1[2].SetOffset(notches[2])
p.rn1[2].SetLength(notches[3] - notches[2])
}
{ // node 2
// [0:n1]
p.rn2[0].SetLength(notches[0])
// [n1:n2]
p.rn2[1].SetOffset(notches[0])
p.rn2[1].SetLength(notches[1] - notches[0])
// [n2:full]
p.rn2[2].SetOffset(notches[1])
p.rn2[2].SetLength(notches[3] - notches[1])
}
}
func (c *Context) splitPayload(id oid.ID) []uint64 {
var (
prev uint64
size = c.objectSize(id)
notches = make([]uint64, 0, hashRangeNumber)
)
for i := uint64(0); i < hashRangeNumber; i++ {
if i < hashRangeNumber-1 {
max := size - prev - (hashRangeNumber - i)
if max == 0 {
prev++
} else {
prev += rand.Uint64()%max + 1
}
} else {
prev = size
}
notches = append(notches, prev)
}
return notches
}
func (c *Context) collectHashes(ctx context.Context, p *gamePair) {
fn := func(n netmap.NodeInfo, rngs []*object.Range) [][]byte {
// Here we randomize the order a bit: the hypothesis is that this
// makes it harder for an unscrupulous node to come up with a
// reliable cheating strategy.
order := make([]int, len(rngs))
for i := range order {
order[i] = i
}
rand.Shuffle(len(order), func(i, j int) { order[i], order[j] = order[j], order[i] })
var getRangeHashPrm GetRangeHashPrm
getRangeHashPrm.CID = c.task.ContainerID()
getRangeHashPrm.OID = p.id
getRangeHashPrm.Node = n
res := make([][]byte, len(rngs))
for _, i := range order {
var sleepDur time.Duration
if c.maxPDPSleep > 0 {
sleepDur = time.Duration(rand.Uint64() % c.maxPDPSleep)
}
c.log.Debug(logs.AuditorSleepBeforeGetRangeHash,
zap.Stringer("interval", sleepDur),
)
time.Sleep(sleepDur)
getRangeHashPrm.Range = rngs[i]
h, err := c.cnrCom.GetRangeHash(ctx, getRangeHashPrm)
if err != nil {
c.log.Debug(logs.AuditorCouldNotGetPayloadRangeHash,
zap.Stringer("id", p.id),
zap.String("node", netmap.StringifyPublicKey(n)),
zap.String("error", err.Error()),
)
return res
}
res[i] = h
}
return res
}
p.hh1 = fn(p.n1, p.rn1)
p.hh2 = fn(p.n2, p.rn2)
}
func (c *Context) analyzeHashes(p *gamePair) {
if len(p.hh1) != hashRangeNumber-1 || len(p.hh2) != hashRangeNumber-1 {
c.failNodesPDP(p.n1, p.n2)
return
}
h1, err := tz.Concat([][]byte{p.hh2[0], p.hh2[1]})
if err != nil || !bytes.Equal(p.hh1[0], h1) {
c.failNodesPDP(p.n1, p.n2)
return
}
h2, err := tz.Concat([][]byte{p.hh1[1], p.hh1[2]})
if err != nil || !bytes.Equal(p.hh2[2], h2) {
c.failNodesPDP(p.n1, p.n2)
return
}
fh, err := tz.Concat([][]byte{h1, h2})
if err != nil || !bytes.Equal(fh, c.objectHomoHash(p.id)) {
c.failNodesPDP(p.n1, p.n2)
return
}
c.passNodesPDP(p.n1, p.n2)
}
func (c *Context) failNodesPDP(ns ...netmap.NodeInfo) {
c.pairedMtx.Lock()
for i := range ns {
c.pairedNodes[ns[i].Hash()].failedPDP = true
}
c.pairedMtx.Unlock()
}
func (c *Context) passNodesPDP(ns ...netmap.NodeInfo) {
c.pairedMtx.Lock()
for i := range ns {
c.pairedNodes[ns[i].Hash()].passedPDP = true
}
c.pairedMtx.Unlock()
}
func (c *Context) writePairsResult() {
var failCount, okCount int
c.iteratePairedNodes(
func(netmap.NodeInfo) { failCount++ },
func(netmap.NodeInfo) { okCount++ },
)
failedNodes := make([][]byte, 0, failCount)
passedNodes := make([][]byte, 0, okCount)
c.iteratePairedNodes(
func(n netmap.NodeInfo) {
failedNodes = append(failedNodes, n.PublicKey())
},
func(n netmap.NodeInfo) {
passedNodes = append(passedNodes, n.PublicKey())
},
)
c.report.SetPDPResults(passedNodes, failedNodes)
}
func (c *Context) iteratePairedNodes(onFail, onPass func(netmap.NodeInfo)) {
for _, pairedNode := range c.pairedNodes {
if pairedNode.failedPDP {
onFail(pairedNode.node)
}
if pairedNode.passedPDP {
onPass(pairedNode.node)
}
}
}

View file

@ -1,187 +0,0 @@
package auditor
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"go.uber.org/zap"
)
const (
hashRangeNumber = 4
minGamePayloadSize = hashRangeNumber * tz.Size
)
func (c *Context) executePoP(ctx context.Context) {
c.buildCoverage(ctx)
c.report.SetPlacementCounters(
c.counters.hit,
c.counters.miss,
c.counters.fail,
)
}
func (c *Context) buildCoverage(ctx context.Context) {
policy := c.task.ContainerStructure().PlacementPolicy()
// select random member from another storage group
// and process all placement vectors
c.iterateSGMembersPlacementRand(func(id oid.ID, ind int, nodes []netmap.NodeInfo) bool {
c.processObjectPlacement(ctx, id, nodes, policy.ReplicaNumberByIndex(ind))
return c.containerCovered()
})
}
func (c *Context) containerCovered() bool {
// number of container nodes can be calculated once
return c.cnrNodesNum <= len(c.pairedNodes)
}
func (c *Context) processObjectPlacement(ctx context.Context, id oid.ID, nodes []netmap.NodeInfo, replicas uint32) {
var (
ok uint32
optimal bool
unpairedCandidate1, unpairedCandidate2 = -1, -1
pairedCandidate = -1
)
var getHeaderPrm GetHeaderPrm
getHeaderPrm.OID = id
getHeaderPrm.CID = c.task.ContainerID()
getHeaderPrm.NodeIsRelay = false
for i := 0; ok < replicas && i < len(nodes); i++ {
getHeaderPrm.Node = nodes[i]
// try to get object header from node
hdr, err := c.cnrCom.GetHeader(ctx, getHeaderPrm)
if err != nil {
c.log.Debug(logs.AuditorCouldNotGetObjectHeaderFromCandidate,
zap.Stringer("id", id),
zap.String("error", err.Error()),
)
continue
}
c.updateHeadResponses(hdr)
// increment success counter
ok++
// update optimal flag
optimal = ok == replicas && uint32(i) < replicas
// exclude small objects from coverage
if c.objectSize(id) < minGamePayloadSize {
continue
}
// update potential candidates to be paired
if _, ok := c.pairedNodes[nodes[i].Hash()]; !ok {
if unpairedCandidate1 < 0 {
unpairedCandidate1 = i
} else if unpairedCandidate2 < 0 {
unpairedCandidate2 = i
}
} else if pairedCandidate < 0 {
pairedCandidate = i
}
}
if optimal {
c.counters.hit++
} else if ok == replicas {
c.counters.miss++
} else {
c.counters.fail++
}
if unpairedCandidate1 >= 0 {
if unpairedCandidate2 >= 0 {
c.composePair(id, nodes[unpairedCandidate1], nodes[unpairedCandidate2])
} else if pairedCandidate >= 0 {
c.composePair(id, nodes[unpairedCandidate1], nodes[pairedCandidate])
}
}
}
func (c *Context) composePair(id oid.ID, n1, n2 netmap.NodeInfo) {
c.pairs = append(c.pairs, gamePair{
n1: n1,
n2: n2,
id: id,
})
c.pairedNodes[n1.Hash()] = &pairMemberInfo{
node: n1,
}
c.pairedNodes[n2.Hash()] = &pairMemberInfo{
node: n2,
}
}
func (c *Context) iterateSGMembersPlacementRand(f func(oid.ID, int, []netmap.NodeInfo) bool) {
// iterate over storage groups members for all storage groups (one by one)
// with randomly shuffled members
c.iterateSGMembersRand(func(id oid.ID) bool {
// build placement vector for the current object
nn, err := c.buildPlacement(id)
if err != nil {
c.log.Debug(logs.AuditorCouldNotBuildPlacementForObject,
zap.Stringer("id", id),
zap.String("error", err.Error()),
)
return false
}
for i, nodes := range nn {
if f(id, i, nodes) {
return true
}
}
return false
})
}
func (c *Context) iterateSGMembersRand(f func(oid.ID) bool) {
c.iterateSGInfo(func(members []oid.ID) bool {
ln := len(members)
processed := make(map[uint64]struct{}, ln-1)
for len(processed) < ln {
ind := nextRandUint64(uint64(ln), processed)
processed[ind] = struct{}{}
if f(members[ind]) {
return true
}
}
return false
})
}
func (c *Context) iterateSGInfo(f func([]oid.ID) bool) {
c.sgMembersMtx.RLock()
defer c.sgMembersMtx.RUnlock()
// we can add randomization like for SG members,
// but list of storage groups is already expected
// to be shuffled since it is a Search response
// with unpredictable order
for i := range c.sgMembersCache {
if f(c.sgMembersCache[i]) {
return
}
}
}

View file

@ -1,156 +0,0 @@
package auditor
import (
"bytes"
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
storagegroupSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/storagegroup"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"go.uber.org/zap"
)
func (c *Context) executePoR(ctx context.Context) {
wg := new(sync.WaitGroup)
sgs := c.task.StorageGroupList()
for _, sg := range sgs {
wg.Add(1)
if err := c.porWorkerPool.Submit(func() {
c.checkStorageGroupPoR(ctx, sg.ID(), sg.StorageGroup())
wg.Done()
}); err != nil {
wg.Done()
}
}
wg.Wait()
c.porWorkerPool.Release()
c.report.SetPoRCounters(c.porRequests.Load(), c.porRetries.Load())
}
func (c *Context) checkStorageGroupPoR(ctx context.Context, sgID oid.ID, sg storagegroupSDK.StorageGroup) {
members := sg.Members()
c.updateSGInfo(sgID, members)
var (
tzHash []byte
totalSize uint64
accRequests, accRetries uint32
)
var getHeaderPrm GetHeaderPrm
getHeaderPrm.CID = c.task.ContainerID()
getHeaderPrm.NodeIsRelay = true
homomorphicHashingEnabled := !containerSDK.IsHomomorphicHashingDisabled(c.task.ContainerStructure())
for i := range members {
flat, ok := c.getShuffledNodes(members[i], sgID)
if !ok {
continue
}
getHeaderPrm.OID = members[i]
for j := range flat {
accRequests++
if j > 0 { // in best case audit get object header on first iteration
accRetries++
}
getHeaderPrm.Node = flat[j]
hdr, err := c.cnrCom.GetHeader(ctx, getHeaderPrm)
if err != nil {
c.log.Debug(logs.AuditorCantHeadObject,
zap.String("remote_node", netmap.StringifyPublicKey(flat[j])),
zap.Stringer("oid", members[i]),
)
continue
}
// update cache for PoR and PDP audit checks
c.updateHeadResponses(hdr)
if homomorphicHashingEnabled {
cs, _ := hdr.PayloadHomomorphicHash()
if len(tzHash) == 0 {
tzHash = cs.Value()
} else {
tzHash, err = tz.Concat([][]byte{
tzHash,
cs.Value(),
})
if err != nil {
c.log.Debug(logs.AuditorCantConcatenateTzHash,
zap.String("oid", members[i].String()),
zap.String("error", err.Error()))
break
}
}
}
totalSize += hdr.PayloadSize()
break
}
}
c.porRequests.Add(accRequests)
c.porRetries.Add(accRetries)
sizeCheck := sg.ValidationDataSize() == totalSize
cs, _ := sg.ValidationDataHash()
tzCheck := !homomorphicHashingEnabled || bytes.Equal(tzHash, cs.Value())
c.writeCheckReport(sizeCheck, tzCheck, sgID, sg, totalSize)
}
func (c *Context) writeCheckReport(sizeCheck, tzCheck bool, sgID oid.ID, sg storagegroupSDK.StorageGroup, totalSize uint64) {
if sizeCheck && tzCheck {
c.report.PassedPoR(sgID)
} else {
if !sizeCheck {
c.log.Debug(logs.AuditorStorageGroupSizeCheckFailed,
zap.Uint64("expected", sg.ValidationDataSize()),
zap.Uint64("got", totalSize))
}
if !tzCheck {
c.log.Debug(logs.AuditorStorageGroupTzHashCheckFailed)
}
c.report.FailedPoR(sgID)
}
}
func (c *Context) getShuffledNodes(member oid.ID, sgID oid.ID) ([]netmap.NodeInfo, bool) {
objectPlacement, err := c.buildPlacement(member)
if err != nil {
c.log.Info(logs.AuditorCantBuildPlacementForStorageGroupMember,
zap.Stringer("sg", sgID),
zap.String("member_id", member.String()),
)
return nil, false
}
flat := placement.FlattenNodes(objectPlacement)
rand.Shuffle(len(flat), func(i, j int) {
flat[i], flat[j] = flat[j], flat[i]
})
return flat, true
}

View file

@ -1,18 +0,0 @@
package auditor
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
)
// nextRandUint64 returns random uint64 number [0; n) outside exclude map.
// Panics if len(exclude) >= n.
func nextRandUint64(n uint64, exclude map[uint64]struct{}) uint64 {
ln := uint64(len(exclude))
ind := rand.Uint64() % (n - ln)
for i := ind; ; i++ {
if _, ok := exclude[i]; !ok {
return i
}
}
}

View file

@ -1,89 +0,0 @@
package audit
import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/audit"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
// Report tracks the progress of auditing container data.
type Report struct {
mu sync.RWMutex
res audit.Result
}
// Reporter is an interface of the entity that records
// the data audit report.
type Reporter interface {
WriteReport(r *Report) error
}
// NewReport creates and returns blank Report instance.
func NewReport(cnr cid.ID) *Report {
var rep Report
rep.res.ForContainer(cnr)
return &rep
}
// Result forms the structure of the data audit result.
func (r *Report) Result() *audit.Result {
r.mu.RLock()
defer r.mu.RUnlock()
return &r.res
}
// Complete completes audit report.
func (r *Report) Complete() {
r.mu.Lock()
defer r.mu.Unlock()
r.res.Complete()
}
// PassedPoR updates list of passed storage groups.
func (r *Report) PassedPoR(sg oid.ID) {
r.mu.Lock()
defer r.mu.Unlock()
r.res.SubmitPassedStorageGroup(sg)
}
// FailedPoR updates list of failed storage groups.
func (r *Report) FailedPoR(sg oid.ID) {
r.mu.Lock()
defer r.mu.Unlock()
r.res.SubmitFailedStorageGroup(sg)
}
// SetPlacementCounters sets counters of compliance with placement.
func (r *Report) SetPlacementCounters(hit, miss, fail uint32) {
r.mu.Lock()
defer r.mu.Unlock()
r.res.SetHits(hit)
r.res.SetMisses(miss)
r.res.SetFailures(fail)
}
// SetPDPResults sets lists of nodes according to their PDP results.
func (r *Report) SetPDPResults(passed, failed [][]byte) {
r.mu.Lock()
defer r.mu.Unlock()
r.res.SubmitPassedStorageNodes(passed)
r.res.SubmitFailedStorageNodes(failed)
}
// SetPoRCounters sets amounts of head requests and retries at PoR audit stage.
func (r *Report) SetPoRCounters(requests, retries uint32) {
r.mu.Lock()
defer r.mu.Unlock()
r.res.SetRequestsPoR(requests)
r.res.SetRetriesPoR(retries)
}

View file

@ -1,120 +0,0 @@
package audit
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
// Task groups groups the container audit parameters.
type Task struct {
cancelCh <-chan struct{}
reporter Reporter
idCnr cid.ID
cnr container.Container
nm *netmap.NetMap
cnrNodes [][]netmap.NodeInfo
sgList []storagegroup.StorageGroup
}
// WithReporter sets audit report writer.
func (t *Task) WithReporter(r Reporter) *Task {
if t != nil {
t.reporter = r
}
return t
}
// Reporter returns audit report writer.
func (t *Task) Reporter() Reporter {
return t.reporter
}
func (t *Task) WithCancelChannel(ch <-chan struct{}) *Task {
if ch != nil {
t.cancelCh = ch
}
return t
}
func (t *Task) CancelChannel() <-chan struct{} {
return t.cancelCh
}
// WithContainerID sets identifier of the container under audit.
func (t *Task) WithContainerID(cnr cid.ID) *Task {
if t != nil {
t.idCnr = cnr
}
return t
}
// ContainerID returns identifier of the container under audit.
func (t *Task) ContainerID() cid.ID {
return t.idCnr
}
// WithContainerStructure sets structure of the container under audit.
func (t *Task) WithContainerStructure(cnr container.Container) *Task {
if t != nil {
t.cnr = cnr
}
return t
}
// ContainerStructure returns structure of the container under audit.
func (t *Task) ContainerStructure() container.Container {
return t.cnr
}
// WithContainerNodes sets nodes in the container under audit.
func (t *Task) WithContainerNodes(cnrNodes [][]netmap.NodeInfo) *Task {
if t != nil {
t.cnrNodes = cnrNodes
}
return t
}
// NetworkMap returns network map of audit epoch.
func (t *Task) NetworkMap() *netmap.NetMap {
return t.nm
}
// WithNetworkMap sets network map of audit epoch.
func (t *Task) WithNetworkMap(nm *netmap.NetMap) *Task {
if t != nil {
t.nm = nm
}
return t
}
// ContainerNodes returns nodes in the container under audit.
func (t *Task) ContainerNodes() [][]netmap.NodeInfo {
return t.cnrNodes
}
// WithStorageGroupList sets a list of storage groups from container under audit.
func (t *Task) WithStorageGroupList(sgList []storagegroup.StorageGroup) *Task {
if t != nil {
t.sgList = sgList
}
return t
}
// StorageGroupList returns list of storage groups from container under audit.
func (t *Task) StorageGroupList() []storagegroup.StorageGroup {
return t.sgList
}

View file

@ -1,85 +0,0 @@
package audittask
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/auditor"
"go.uber.org/zap"
)
// Listen starts the process of processing tasks from the queue.
//
// The listener is terminated by context.
func (m *Manager) Listen(ctx context.Context) {
m.log.Info(logs.TaskmanagerProcessRoutine,
zap.Uint32("queue_capacity", m.queueCap),
)
m.ch = make(chan *audit.Task, m.queueCap)
for {
select {
case <-ctx.Done():
m.log.Warn(logs.TaskmanagerStopListenerByContext,
zap.String("error", ctx.Err().Error()),
)
m.workerPool.Release()
return
case task, ok := <-m.ch:
if !ok {
m.log.Warn(logs.TaskmanagerQueueChannelIsClosed)
return
}
tCtx, tCancel := context.WithCancel(ctx) // cancel task in case of listen cancel
go func() {
select {
case <-tCtx.Done(): // listen cancelled or task completed
return
case <-task.CancelChannel(): // new epoch
tCancel()
}
}()
m.handleTask(tCtx, task, tCancel)
}
}
}
func (m *Manager) handleTask(ctx context.Context, task *audit.Task, onCompleted func()) {
pdpPool, err := m.pdpPoolGenerator()
if err != nil {
m.log.Error(logs.TaskmanagerCouldNotGeneratePDPWorkerPool,
zap.String("error", err.Error()),
)
onCompleted()
return
}
porPool, err := m.pdpPoolGenerator()
if err != nil {
m.log.Error(logs.TaskmanagerCouldNotGeneratePoRWorkerPool,
zap.String("error", err.Error()),
)
onCompleted()
return
}
auditContext := m.generateContext(task).
WithPDPWorkerPool(pdpPool).
WithPoRWorkerPool(porPool)
if err := m.workerPool.Submit(func() { auditContext.Execute(ctx, onCompleted) }); err != nil {
// may be we should report it
m.log.Warn(logs.TaskmanagerCouldNotSubmitAuditTask)
onCompleted()
}
}
func (m *Manager) generateContext(task *audit.Task) *auditor.Context {
return auditor.NewContext(m.ctxPrm).
WithTask(task)
}

View file

@ -1,107 +0,0 @@
package audittask
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/auditor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Manager represents an entity performing data audit tasks.
type Manager struct {
*cfg
ch chan *audit.Task
}
// Option is a Manager's constructor option.
type Option func(*cfg)
type cfg struct {
queueCap uint32
log *logger.Logger
ctxPrm auditor.ContextPrm
workerPool util.WorkerPool
pdpPoolGenerator, porPoolGenerator func() (util.WorkerPool, error)
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
}
}
// New creates, initializes and returns new Manager instance.
func New(opts ...Option) *Manager {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &Manager{
cfg: c,
}
}
// WithLogger returns option to specify Manager's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = &logger.Logger{Logger: l.With(zap.String("component", "Audit task manager"))}
c.ctxPrm.SetLogger(l)
}
}
// WithWorkerPool returns option to set worker pool
// for task execution.
func WithWorkerPool(p util.WorkerPool) Option {
return func(c *cfg) {
c.workerPool = p
}
}
// WithQueueCapacity returns option to set task queue capacity.
func WithQueueCapacity(capacity uint32) Option {
return func(c *cfg) {
c.queueCap = capacity
}
}
// WithContainerCommunicator returns option to set component of communication
// with container nodes.
func WithContainerCommunicator(cnrCom auditor.ContainerCommunicator) Option {
return func(c *cfg) {
c.ctxPrm.SetContainerCommunicator(cnrCom)
}
}
// WithMaxPDPSleepInterval returns option to set maximum sleep interval
// between range hash requests as part of PDP check.
func WithMaxPDPSleepInterval(dur time.Duration) Option {
return func(c *cfg) {
c.ctxPrm.SetMaxPDPSleep(dur)
}
}
// WithPDPWorkerPoolGenerator returns option to set worker pool for PDP pairs processing.
// Callback caller owns returned pool and must release it appropriately.
func WithPDPWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option {
return func(c *cfg) {
c.pdpPoolGenerator = f
}
}
// WithPoRWorkerPoolGenerator returns option to set worker pool for PoR SG processing.
// Callback caller owns returned pool and must release it appropriately.
func WithPoRWorkerPoolGenerator(f func() (util.WorkerPool, error)) Option {
return func(c *cfg) {
c.porPoolGenerator = f
}
}

View file

@ -1,10 +0,0 @@
package audittask
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
)
// PushTask adds a task to the queue for processing.
func (m *Manager) PushTask(t *audit.Task) {
m.ch <- t
}

View file

@ -1,11 +0,0 @@
package audittask
// Reset pops all tasks from the queue.
// Returns amount of popped elements.
func (m *Manager) Reset() (popped int) {
for ; len(m.ch) > 0; popped++ {
<-m.ch
}
return
}