frostfs-node/pkg/services/reputation/local/controller/calls.go

194 lines
4.1 KiB
Go

package trustcontroller
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// ReportPrm groups the required parameters of the Controller.Report method.
type ReportPrm struct {
epoch uint64
}
// SetEpoch sets epoch number to select reputation values.
func (p *ReportPrm) SetEpoch(e uint64) {
p.epoch = e
}
// Report reports local reputation values.
//
// Single Report operation overtakes all data from LocalTrustSource
// to LocalTrustTarget (Controller's parameters).
//
// Each call acquires a report context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Report(ctx context.Context, prm ReportPrm) {
// acquire report
rCtx, reporter := c.acquireReporter(ctx, prm.epoch)
if reporter == nil {
return
}
// report local trust values
reporter.report(rCtx)
// finally stop and free the report
c.freeReport(prm.epoch, reporter.log)
}
type reporter struct {
epoch uint64
ctrl *Controller
log *logger.Logger
ep common.EpochProvider
}
type epochProvider struct {
epoch uint64
}
func (c epochProvider) Epoch() uint64 {
return c.epoch
}
func (c *Controller) acquireReporter(ctx context.Context, epoch uint64) (context.Context, *reporter) {
started := true
c.mtx.Lock()
{
if cancel := c.mCtx[epoch]; cancel == nil {
ctx, cancel = context.WithCancel(ctx)
c.mCtx[epoch] = cancel
started = false
}
}
c.mtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", epoch),
)}
if started {
log.Debug(logs.ControllerReportIsAlreadyStarted)
return ctx, nil
}
return ctx, &reporter{
epoch: epoch,
ctrl: c,
log: log,
ep: &epochProvider{
epoch: epoch,
},
}
}
func (c *reporter) report(ctx context.Context) {
c.log.Debug(logs.ControllerStartingToReportLocalTrustValues)
// initialize iterator over locally collected values
iterator, err := c.ctrl.prm.LocalTrustSource.InitIterator(c.ep)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocalTrustValues,
zap.String("error", err.Error()),
)
return
}
// initialize target of local trust values
targetWriter, err := c.ctrl.prm.LocalTrustTarget.InitWriter(c.ep)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeLocalTrustTarget,
zap.String("error", err.Error()),
)
return
}
// iterate over all values and write them to the target
err = iterator.Iterate(
func(t reputation.Trust) error {
// check if context is done
if err := ctx.Err(); err != nil {
return err
}
return targetWriter.Write(ctx, t)
},
)
if err != nil && !errors.Is(err, context.Canceled) {
c.log.Debug(logs.ControllerIteratorOverLocalTrustFailed,
zap.String("error", err.Error()),
)
return
}
// finish writing
err = targetWriter.Close(ctx)
if err != nil {
c.log.Debug(logs.ControllerCouldNotFinishWritingLocalTrustValues,
zap.String("error", err.Error()),
)
return
}
c.log.Debug(logs.ControllerReportingSuccessfullyFinished)
}
func (c *Controller) freeReport(epoch uint64, log *logger.Logger) {
var stopped bool
c.mtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.mCtx[epoch]
if stopped {
cancel()
delete(c.mCtx, epoch)
}
}
c.mtx.Unlock()
if stopped {
log.Debug(logs.ControllerReportingSuccessfullyInterrupted)
} else {
log.Debug(logs.ControllerReportingIsNotStartedOrAlreadyInterrupted)
}
}
// StopPrm groups the required parameters of the Controller.Stop method.
type StopPrm struct {
epoch uint64
}
// SetEpoch sets epoch number the processing of the values of which must be interrupted.
func (p *StopPrm) SetEpoch(e uint64) {
p.epoch = e
}
// Stop interrupts the processing of local trust values.
//
// Releases acquired report context.
func (c *Controller) Stop(prm StopPrm) {
c.freeReport(
prm.epoch,
&logger.Logger{Logger: c.opts.log.With(zap.Uint64("epoch", prm.epoch))},
)
}