frostfs-node/pkg/innerring/processors/audit/processor.go
Leonard Lyubich 2ebcbe70fb [#255] ir/audit: Make task manager to return number of skipped tasks
Add numeric return from TaskManager.Reset method that shows the number of
canceled tasks. This values will be used for assessment of the progress of
the audit.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-12-25 16:49:27 +03:00

153 lines
4.2 KiB
Go

package audit
import (
"context"
"github.com/nspcc-dev/neo-go/pkg/util"
SDKClient "github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
"github.com/panjf2000/ants/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type (
// Indexer is a callback interface for inner ring global state.
Indexer interface {
Index() int32
InnerRingSize() int32
}
// NeoFSClientCache is an interface for cache of neofs RPC clients
NeoFSClientCache interface {
Get(address string, opts ...SDKClient.Option) (*SDKClient.Client, error)
}
TaskManager interface {
PushTask(*audit.Task) error
// Must skip all tasks planned for execution and
// return their number.
Reset() int
}
// Processor of events related with data audit.
Processor struct {
log *zap.Logger
pool *ants.Pool
containerContract util.Uint160
auditContract util.Uint160
morphClient *client.Client
irList Indexer
clientCache NeoFSClientCache
containerClient *wrapContainer.Wrapper
netmapClient *wrapNetmap.Wrapper
taskManager TaskManager
reporter audit.Reporter
prevAuditCanceler context.CancelFunc
}
// Params of the processor constructor.
Params struct {
Log *zap.Logger
NetmapContract util.Uint160
ContainerContract util.Uint160
AuditContract util.Uint160
MorphClient *client.Client
IRList Indexer
ClientCache NeoFSClientCache
TaskManager TaskManager
Reporter audit.Reporter
}
)
type epochAuditReporter struct {
epoch uint64
rep audit.Reporter
}
// AuditProcessor manages audit tasks and fills queue for next epoch. This
// process must not be interrupted by new audit epoch, so we limit pool size
// for processor to one.
const ProcessorPoolSize = 1
// New creates audit processor instance.
func New(p *Params) (*Processor, error) {
switch {
case p.Log == nil:
return nil, errors.New("ir/audit: logger is not set")
case p.MorphClient == nil:
return nil, errors.New("ir/audit: neo:morph client is not set")
case p.IRList == nil:
return nil, errors.New("ir/audit: global state is not set")
case p.ClientCache == nil:
return nil, errors.New("ir/audit: neofs RPC client cache is not set")
}
pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true))
if err != nil {
return nil, errors.Wrap(err, "ir/audit: can't create worker pool")
}
// creating enhanced client for getting network map
netmapClient, err := invoke.NewNoFeeNetmapClient(p.MorphClient, p.NetmapContract)
if err != nil {
return nil, err
}
// creating enhanced client for getting containers
containerClient, err := invoke.NewNoFeeContainerClient(p.MorphClient, p.ContainerContract)
if err != nil {
return nil, err
}
return &Processor{
log: p.Log,
pool: pool,
containerContract: p.ContainerContract,
auditContract: p.AuditContract,
morphClient: p.MorphClient,
irList: p.IRList,
clientCache: p.ClientCache,
containerClient: containerClient,
netmapClient: netmapClient,
taskManager: p.TaskManager,
reporter: p.Reporter,
prevAuditCanceler: func() {},
}, nil
}
// ListenerParsers for the 'event.Listener' event producer.
func (ap *Processor) ListenerParsers() []event.ParserInfo {
return nil
}
// ListenerHandlers for the 'event.Listener' event producer.
func (ap *Processor) ListenerHandlers() []event.HandlerInfo {
return nil
}
// TimersHandlers for the 'Timers' event producer.
func (ap *Processor) TimersHandlers() []event.HandlerInfo {
return nil
}
// StartAuditHandler for the internal event producer.
func (ap *Processor) StartAuditHandler() event.Handler {
return ap.handleNewAuditRound
}
func (r *epochAuditReporter) WriteReport(rep *audit.Report) error {
res := rep.Result()
res.SetAuditEpoch(r.epoch)
return r.rep.WriteReport(rep)
}