diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 19dd98e2..d4ab96f3 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -589,6 +589,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- NetmapClient: server.netmapClient, ContainerClient: cnrClient, IRList: server, + EpochSource: server, SGSource: clientCache, Key: &server.key.PrivateKey, RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 058fbb5e..f751cbe4 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -12,7 +12,9 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/rand" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + storagegroupSDK "github.com/nspcc-dev/neofs-sdk-go/storagegroup" "go.uber.org/zap" ) @@ -80,14 +82,20 @@ func (ap *Processor) processStartAudit(epoch uint64) { }) // search storage groups - storageGroups := ap.findStorageGroups(containers[i], n) + storageGroupsIDs := ap.findStorageGroups(containers[i], n) log.Info("select storage groups for audit", + zap.Stringer("cid", containers[i]), + zap.Int("amount", len(storageGroupsIDs))) + + // filter expired storage groups + storageGroups := ap.filterExpiredSG(containers[i], storageGroupsIDs, nodes, *nm) + log.Info("filter expired storage groups for audit", zap.Stringer("cid", containers[i]), zap.Int("amount", len(storageGroups))) - // skip audit for containers - // without storage groups - if len(storageGroups) == 0 { + // skip audit for containers without + // non-expired storage groups + if len(storageGroupsIDs) == 0 { continue } @@ -161,3 +169,41 @@ func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) [] return sg } + +func (ap *Processor) filterExpiredSG(cid cid.ID, sgIDs []oid.ID, + cnr [][]netmap.NodeInfo, nm netmap.NetMap) map[oid.ID]storagegroupSDK.StorageGroup { + sgs := make(map[oid.ID]storagegroupSDK.StorageGroup, len(sgIDs)) + + var getSGPrm GetSGPrm + getSGPrm.CID = cid + getSGPrm.Container = cnr + getSGPrm.NetMap = nm + + for _, sgID := range sgIDs { + ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) + + getSGPrm.OID = sgID + getSGPrm.Context = ctx + + sg, err := ap.sgSrc.GetSG(getSGPrm) + + cancel() + + if err != nil { + ap.log.Error( + "could not get storage group object for audit, skipping", + zap.Stringer("cid", cid), + zap.Stringer("oid", sgID), + zap.Error(err), + ) + continue + } + + // filter expired epochs + if sg.ExpirationEpoch() > ap.epochSrc.EpochCounter() { + sgs[sgID] = *sg + } + } + + return sgs +} diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index cdcad063..a4c3d50c 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -35,12 +35,20 @@ type ( Reset() int } + // EpochSource is an interface that provides actual + // epoch information. + EpochSource interface { + // EpochCounter must return current epoch number. + EpochCounter() uint64 + } + // Processor of events related to data audit. Processor struct { log *zap.Logger pool *ants.Pool irList Indexer sgSrc SGSource + epochSrc EpochSource searchTimeout time.Duration containerClient *cntClient.Client @@ -62,6 +70,7 @@ type ( TaskManager TaskManager Reporter audit.Reporter Key *ecdsa.PrivateKey + EpochSource EpochSource } ) @@ -148,6 +157,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/audit: audit result reporter is not set") case p.Key == nil: return nil, errors.New("ir/audit: signing key is not set") + case p.EpochSource == nil: + return nil, errors.New("ir/audit: epoch source is not set") } pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true)) @@ -161,6 +172,7 @@ func New(p *Params) (*Processor, error) { containerClient: p.ContainerClient, irList: p.IRList, sgSrc: p.SGSource, + epochSrc: p.EpochSource, searchTimeout: p.RPCSearchTimeout, netmapClient: p.NetmapClient, taskManager: p.TaskManager, diff --git a/pkg/services/audit/auditor/context.go b/pkg/services/audit/auditor/context.go index 0028644e..b20adf4c 100644 --- a/pkg/services/audit/auditor/context.go +++ b/pkg/services/audit/auditor/context.go @@ -26,7 +26,7 @@ type Context struct { report *audit.Report sgMembersMtx sync.RWMutex - sgMembersCache map[int][]oid.ID + sgMembersCache map[oid.ID][]oid.ID placementMtx sync.Mutex placementCache map[string][][]netmap.NodeInfo @@ -178,7 +178,7 @@ func (c *Context) containerID() cid.ID { func (c *Context) init() { c.report = audit.NewReport(c.containerID()) - c.sgMembersCache = make(map[int][]oid.ID) + c.sgMembersCache = make(map[oid.ID][]oid.ID) c.placementCache = make(map[string][][]netmap.NodeInfo) @@ -293,9 +293,9 @@ func (c *Context) updateHeadResponses(hdr *object.Object) { } } -func (c *Context) updateSGInfo(ind int, members []oid.ID) { +func (c *Context) updateSGInfo(id oid.ID, members []oid.ID) { c.sgMembersMtx.Lock() defer c.sgMembersMtx.Unlock() - c.sgMembersCache[ind] = members + c.sgMembersCache[id] = members } diff --git a/pkg/services/audit/auditor/por.go b/pkg/services/audit/auditor/por.go index 04232e3d..e5f0a2d5 100644 --- a/pkg/services/audit/auditor/por.go +++ b/pkg/services/audit/auditor/por.go @@ -5,10 +5,10 @@ import ( "encoding/hex" "sync" - "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/rand" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + storagegroupSDK "github.com/nspcc-dev/neofs-sdk-go/storagegroup" "github.com/nspcc-dev/tzhash/tz" "go.uber.org/zap" ) @@ -17,13 +17,11 @@ func (c *Context) executePoR() { wg := new(sync.WaitGroup) sgs := c.task.StorageGroupList() - for i := range sgs { + for id, sg := range sgs { wg.Add(1) - sg := sgs[i] - if err := c.porWorkerPool.Submit(func() { - c.checkStorageGroupPoR(i, sg) + c.checkStorageGroupPoR(id, sg) wg.Done() }); err != nil { wg.Done() @@ -36,26 +34,9 @@ func (c *Context) executePoR() { c.report.SetPoRCounters(c.porRequests.Load(), c.porRetries.Load()) } -func (c *Context) checkStorageGroupPoR(ind int, sg oid.ID) { - var getSgPrm audit.GetSGPrm - - getSgPrm.Context = c.task.AuditContext() - getSgPrm.CID = c.task.ContainerID() - getSgPrm.OID = sg - getSgPrm.NetMap = *c.task.NetworkMap() - getSgPrm.Container = c.task.ContainerNodes() - - storageGroup, err := c.cnrCom.GetSG(getSgPrm) // get storage group - if err != nil { - c.log.Warn("can't get storage group", - zap.Stringer("sgid", sg), - zap.String("error", err.Error())) - - return - } - - members := storageGroup.Members() - c.updateSGInfo(ind, members) +func (c *Context) checkStorageGroupPoR(sgID oid.ID, sg storagegroupSDK.StorageGroup) { + members := sg.Members() + c.updateSGInfo(sgID, members) var ( tzHash []byte @@ -73,7 +54,7 @@ func (c *Context) checkStorageGroupPoR(ind int, sg oid.ID) { objectPlacement, err := c.buildPlacement(members[i]) if err != nil { c.log.Info("can't build placement for storage group member", - zap.Stringer("sg", sg), + zap.Stringer("sg", sgID), zap.String("member_id", members[i].String()), ) @@ -133,16 +114,16 @@ func (c *Context) checkStorageGroupPoR(ind int, sg oid.ID) { c.porRequests.Add(accRequests) c.porRetries.Add(accRetries) - sizeCheck := storageGroup.ValidationDataSize() == totalSize - cs, _ := storageGroup.ValidationDataHash() + sizeCheck := sg.ValidationDataSize() == totalSize + cs, _ := sg.ValidationDataHash() tzCheck := bytes.Equal(tzHash, cs.Value()) if sizeCheck && tzCheck { - c.report.PassedPoR(sg) // write report + c.report.PassedPoR(sgID) // write report } else { if !sizeCheck { c.log.Debug("storage group size check failed", - zap.Uint64("expected", storageGroup.ValidationDataSize()), + zap.Uint64("expected", sg.ValidationDataSize()), zap.Uint64("got", totalSize)) } @@ -150,6 +131,6 @@ func (c *Context) checkStorageGroupPoR(ind int, sg oid.ID) { c.log.Debug("storage group tz hash check failed") } - c.report.FailedPoR(sg) // write report + c.report.FailedPoR(sgID) // write report } } diff --git a/pkg/services/audit/task.go b/pkg/services/audit/task.go index f207033a..d5832947 100644 --- a/pkg/services/audit/task.go +++ b/pkg/services/audit/task.go @@ -7,6 +7,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + storagegroupSDK "github.com/nspcc-dev/neofs-sdk-go/storagegroup" ) // Task groups groups the container audit parameters. @@ -23,7 +24,7 @@ type Task struct { cnrNodes [][]netmap.NodeInfo - sgList []oid.ID + sgList map[oid.ID]storagegroupSDK.StorageGroup } // WithReporter sets audit report writer. @@ -111,7 +112,7 @@ func (t *Task) ContainerNodes() [][]netmap.NodeInfo { } // WithStorageGroupList sets a list of storage groups from container under audit. -func (t *Task) WithStorageGroupList(sgList []oid.ID) *Task { +func (t *Task) WithStorageGroupList(sgList map[oid.ID]storagegroupSDK.StorageGroup) *Task { if t != nil { t.sgList = sgList } @@ -120,6 +121,6 @@ func (t *Task) WithStorageGroupList(sgList []oid.ID) *Task { } // StorageGroupList returns list of storage groups from container under audit. -func (t *Task) StorageGroupList() []oid.ID { +func (t *Task) StorageGroupList() map[oid.ID]storagegroupSDK.StorageGroup { return t.sgList }