Dmitrii Stepanov
e8d340287f
Resolve containedctx linter. Cancel task by listen cancel. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
297 lines
5.9 KiB
Go
297 lines
5.9 KiB
Go
package auditor
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Context represents container data audit execution context.
|
|
type Context struct {
|
|
ContextPrm
|
|
|
|
task *audit.Task
|
|
|
|
report *audit.Report
|
|
|
|
sgMembersMtx sync.RWMutex
|
|
sgMembersCache map[oid.ID][]oid.ID
|
|
|
|
placementMtx sync.Mutex
|
|
placementCache map[string][][]netmap.NodeInfo
|
|
|
|
porRequests, porRetries atomic.Uint32
|
|
|
|
pairs []gamePair
|
|
|
|
pairedMtx sync.Mutex
|
|
pairedNodes map[uint64]*pairMemberInfo
|
|
|
|
counters struct {
|
|
hit, miss, fail uint32
|
|
}
|
|
|
|
cnrNodesNum int
|
|
|
|
headMtx sync.RWMutex
|
|
headResponses map[string]shortHeader
|
|
}
|
|
|
|
type pairMemberInfo struct {
|
|
failedPDP, passedPDP bool // at least one
|
|
|
|
node netmap.NodeInfo
|
|
}
|
|
|
|
type gamePair struct {
|
|
n1, n2 netmap.NodeInfo
|
|
|
|
id oid.ID
|
|
|
|
rn1, rn2 []*object.Range
|
|
|
|
hh1, hh2 [][]byte
|
|
}
|
|
|
|
type shortHeader struct {
|
|
tzhash []byte
|
|
|
|
objectSize uint64
|
|
}
|
|
|
|
// ContextPrm groups components required to conduct data audit checks.
|
|
type ContextPrm struct {
|
|
maxPDPSleep uint64
|
|
|
|
log *logger.Logger
|
|
|
|
cnrCom ContainerCommunicator
|
|
|
|
pdpWorkerPool, porWorkerPool util.WorkerPool
|
|
}
|
|
|
|
type commonCommunicatorPrm struct {
|
|
Node netmap.NodeInfo
|
|
|
|
OID oid.ID
|
|
CID cid.ID
|
|
}
|
|
|
|
// GetHeaderPrm groups parameter of GetHeader operation.
|
|
type GetHeaderPrm struct {
|
|
commonCommunicatorPrm
|
|
|
|
NodeIsRelay bool
|
|
}
|
|
|
|
// GetRangeHashPrm groups parameter of GetRangeHash operation.
|
|
type GetRangeHashPrm struct {
|
|
commonCommunicatorPrm
|
|
|
|
Range *object.Range
|
|
}
|
|
|
|
// ContainerCommunicator is an interface of
|
|
// component of communication with container nodes.
|
|
type ContainerCommunicator interface {
|
|
// GetHeader must return object header from the container node.
|
|
GetHeader(context.Context, GetHeaderPrm) (*object.Object, error)
|
|
|
|
// GetRangeHash must return homomorphic Tillich-Zemor hash of payload range of the
|
|
// object stored in container node.
|
|
GetRangeHash(context.Context, GetRangeHashPrm) ([]byte, error)
|
|
}
|
|
|
|
// NewContext creates, initializes and returns Context.
|
|
func NewContext(prm ContextPrm) *Context {
|
|
return &Context{
|
|
ContextPrm: prm,
|
|
}
|
|
}
|
|
|
|
// SetLogger sets logging component.
|
|
func (p *ContextPrm) SetLogger(l *logger.Logger) {
|
|
if p != nil {
|
|
p.log = l
|
|
}
|
|
}
|
|
|
|
// SetContainerCommunicator sets component of communication with container nodes.
|
|
func (p *ContextPrm) SetContainerCommunicator(cnrCom ContainerCommunicator) {
|
|
if p != nil {
|
|
p.cnrCom = cnrCom
|
|
}
|
|
}
|
|
|
|
// SetMaxPDPSleep sets maximum sleep interval between range hash requests.
|
|
// as part of PDP check.
|
|
func (p *ContextPrm) SetMaxPDPSleep(dur time.Duration) {
|
|
if p != nil {
|
|
p.maxPDPSleep = uint64(dur)
|
|
}
|
|
}
|
|
|
|
// WithTask sets container audit parameters.
|
|
func (c *Context) WithTask(t *audit.Task) *Context {
|
|
if c != nil {
|
|
c.task = t
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
// WithPDPWorkerPool sets worker pool for PDP pairs processing.
|
|
func (c *Context) WithPDPWorkerPool(pool util.WorkerPool) *Context {
|
|
if c != nil {
|
|
c.pdpWorkerPool = pool
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
// WithPoRWorkerPool sets worker pool for PoR SG processing.
|
|
func (c *Context) WithPoRWorkerPool(pool util.WorkerPool) *Context {
|
|
if c != nil {
|
|
c.porWorkerPool = pool
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *Context) containerID() cid.ID {
|
|
return c.task.ContainerID()
|
|
}
|
|
|
|
func (c *Context) init() {
|
|
c.report = audit.NewReport(c.containerID())
|
|
|
|
c.sgMembersCache = make(map[oid.ID][]oid.ID)
|
|
|
|
c.placementCache = make(map[string][][]netmap.NodeInfo)
|
|
|
|
cnrVectors := c.task.ContainerNodes()
|
|
for i := range cnrVectors {
|
|
c.cnrNodesNum += len(cnrVectors[i])
|
|
}
|
|
|
|
c.pairedNodes = make(map[uint64]*pairMemberInfo)
|
|
|
|
c.headResponses = make(map[string]shortHeader)
|
|
|
|
c.log = &logger.Logger{Logger: c.log.With(
|
|
zap.Stringer("container ID", c.task.ContainerID()),
|
|
)}
|
|
}
|
|
|
|
func (c *Context) expired(ctx context.Context) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
c.log.Debug("audit context is done",
|
|
zap.String("error", ctx.Err().Error()),
|
|
)
|
|
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *Context) complete() {
|
|
c.report.Complete()
|
|
}
|
|
|
|
func (c *Context) writeReport() {
|
|
c.log.Debug("writing audit report...")
|
|
|
|
if err := c.task.Reporter().WriteReport(c.report); err != nil {
|
|
c.log.Error("could not write audit report")
|
|
}
|
|
}
|
|
|
|
func (c *Context) buildPlacement(id oid.ID) ([][]netmap.NodeInfo, error) {
|
|
c.placementMtx.Lock()
|
|
defer c.placementMtx.Unlock()
|
|
|
|
strID := id.EncodeToString()
|
|
|
|
if nn, ok := c.placementCache[strID]; ok {
|
|
return nn, nil
|
|
}
|
|
|
|
nn, err := placement.BuildObjectPlacement(
|
|
c.task.NetworkMap(),
|
|
c.task.ContainerNodes(),
|
|
&id,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.placementCache[strID] = nn
|
|
|
|
return nn, nil
|
|
}
|
|
|
|
func (c *Context) objectSize(id oid.ID) uint64 {
|
|
c.headMtx.RLock()
|
|
defer c.headMtx.RUnlock()
|
|
|
|
strID := id.EncodeToString()
|
|
|
|
if hdr, ok := c.headResponses[strID]; ok {
|
|
return hdr.objectSize
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
func (c *Context) objectHomoHash(id oid.ID) []byte {
|
|
c.headMtx.RLock()
|
|
defer c.headMtx.RUnlock()
|
|
|
|
strID := id.EncodeToString()
|
|
|
|
if hdr, ok := c.headResponses[strID]; ok {
|
|
return hdr.tzhash
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Context) updateHeadResponses(hdr *object.Object) {
|
|
id, ok := hdr.ID()
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
strID := id.EncodeToString()
|
|
cs, _ := hdr.PayloadHomomorphicHash()
|
|
|
|
c.headMtx.Lock()
|
|
defer c.headMtx.Unlock()
|
|
|
|
if _, ok := c.headResponses[strID]; !ok {
|
|
c.headResponses[strID] = shortHeader{
|
|
tzhash: cs.Value(),
|
|
objectSize: hdr.PayloadSize(),
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Context) updateSGInfo(id oid.ID, members []oid.ID) {
|
|
c.sgMembersMtx.Lock()
|
|
defer c.sgMembersMtx.Unlock()
|
|
|
|
c.sgMembersCache[id] = members
|
|
}
|