diff --git a/pkg/services/reputation/local/controller/calls.go b/pkg/services/reputation/local/controller/calls.go new file mode 100644 index 00000000..fefc81d7 --- /dev/null +++ b/pkg/services/reputation/local/controller/calls.go @@ -0,0 +1,195 @@ +package trustcontroller + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// ReportPrm groups the required parameters of the Controller.Report method. +type ReportPrm struct { + epoch uint64 +} + +// SetEpoch sets epoch number to select reputation values. +func (p *ReportPrm) SetEpoch(e uint64) { + p.epoch = e +} + +// Report reports local reputation values. +// +// Single Report operation overtakes all data from LocalTrustSource +// to LocalTrustTarget (Controller's parameters). +// +// Each call acquires a report context for an Epoch parameter. +// At the very end of the operation, the context is released. +func (c *Controller) Report(prm ReportPrm) { + // acquire report + reportCtx := c.acquireReport(prm.epoch) + if reportCtx == nil { + return + } + + // report local trust values + reportCtx.report() + + // finally stop and free the announcement + c.freeReport(prm.epoch, reportCtx.log) +} + +type reportContext struct { + epoch uint64 + + ctrl *Controller + + log *logger.Logger + + ctx Context +} + +type iteratorContext struct { + context.Context + + epoch uint64 +} + +func (c iteratorContext) Epoch() uint64 { + return c.epoch +} + +func (c *Controller) acquireReport(epoch uint64) *reportContext { + var ctx context.Context + + c.mtx.Lock() + + { + if cancel := c.mCtx[epoch]; cancel == nil { + ctx, cancel = context.WithCancel(context.Background()) + c.mCtx[epoch] = cancel + } + } + + c.mtx.Unlock() + + log := c.opts.log.With( + zap.Uint64("epoch", epoch), + ) + + if ctx == nil { + log.Debug("report is already started") + return nil + } + + return &reportContext{ + epoch: epoch, + ctrl: c, + log: log, + ctx: &iteratorContext{ + Context: ctx, + epoch: epoch, + }, + } +} + +func (c *reportContext) report() { + c.log.Debug("starting to report local trust values") + + // initialize iterator over locally collected values + iterator, err := c.ctrl.prm.LocalTrustSource.InitIterator(c.ctx) + if err != nil { + c.log.Debug("could not initialize iterator over local trust values", + zap.String("error", err.Error()), + ) + + return + } + + // initialize target of local trust values + targetWriter, err := c.ctrl.prm.LocalTrustTarget.InitWriter(c.ctx) + if err != nil { + c.log.Debug("could not initialize local trust target", + zap.String("error", err.Error()), + ) + + return + } + + // iterate over all values and write them to the target + err = iterator.Iterate( + func(t reputation.Trust) error { + // check if context is done + if err := c.ctx.Err(); err != nil { + return err + } + + return targetWriter.Write(t) + }, + ) + if err != nil && !errors.Is(err, context.Canceled) { + c.log.Debug("iterator over local trust failed", + zap.String("error", err.Error()), + ) + + return + } + + // finish writing + err = targetWriter.Close() + if err != nil { + c.log.Debug("could not finish writing local trust values", + zap.String("error", err.Error()), + ) + + return + } + + c.log.Debug("reporting successfully finished") +} + +func (c *Controller) freeReport(epoch uint64, log *logger.Logger) { + var stopped bool + + c.mtx.Lock() + + { + var cancel context.CancelFunc + + cancel, stopped = c.mCtx[epoch] + + if stopped { + cancel() + delete(c.mCtx, epoch) + } + } + + c.mtx.Unlock() + + if stopped { + log.Debug("reporting successfully interrupted") + } else { + log.Debug("reporting is not started or already interrupted") + } +} + +// StopPrm groups the required parameters of the Controller.Stop method. +type StopPrm struct { + epoch uint64 +} + +// SetEpoch sets epoch number the processing of the values of which must be interrupted. +func (p *StopPrm) SetEpoch(e uint64) { + p.epoch = e +} + +// Stop interrupts the processing of local trust values. +// +// Releases acquired report context. +func (c *Controller) Stop(prm StopPrm) { + c.freeReport( + prm.epoch, + c.opts.log.With(zap.Uint64("epoch", prm.epoch)), + ) +} diff --git a/pkg/services/reputation/local/controller/controller.go b/pkg/services/reputation/local/controller/controller.go new file mode 100644 index 00000000..e66d7ca0 --- /dev/null +++ b/pkg/services/reputation/local/controller/controller.go @@ -0,0 +1,82 @@ +package trustcontroller + +import ( + "context" + "fmt" + "sync" +) + +// Prm groups the required parameters of the Controller's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type Prm struct { + // Iterator over the reputation values + // collected by the node locally. + // + // Must not be nil. + LocalTrustSource IteratorProvider + + // Place of recording the local values of + // the used space of containers. + // + // Must not be nil. + LocalTrustTarget WriterProvider +} + +// Controller represents main handler for starting +// and interrupting the reporting local trust values. +// +// It binds the interfaces of the local value stores +// to the target storage points. Controller is abstracted +// from the internal storage device and the network location +// of the connecting components. At its core, it is a +// high-level start-stop trigger for reporting. +// +// For correct operation, the controller must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// the constructor is immediately ready to work through +// API of external control of calculations and data transfer. +type Controller struct { + prm Prm + + opts *options + + mtx sync.Mutex + mCtx map[uint64]context.CancelFunc +} + +const invalidPrmValFmt = "invalid parameter %s (%T):%v" + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) +} + +// New creates a new instance of the Controller. +// +// Panics if at least one value of the parameters is invalid. +// +// The created Controller does not require additional +// initialization and is completely ready for work +func New(prm Prm, opts ...Option) *Controller { + switch { + case prm.LocalTrustSource == nil: + panicOnPrmValue("LocalTrustSource", prm.LocalTrustSource) + case prm.LocalTrustTarget == nil: + panicOnPrmValue("LocalTrustTarget", prm.LocalTrustTarget) + } + + o := defaultOpts() + + for _, opt := range opts { + opt(o) + } + + return &Controller{ + prm: prm, + opts: o, + mCtx: make(map[uint64]context.CancelFunc), + } +} diff --git a/pkg/services/reputation/local/controller/deps.go b/pkg/services/reputation/local/controller/deps.go new file mode 100644 index 00000000..f6bc4a99 --- /dev/null +++ b/pkg/services/reputation/local/controller/deps.go @@ -0,0 +1,83 @@ +package trustcontroller + +import ( + "context" + "io" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" +) + +// Context wraps stdlib context +// with accompanying meta values. +type Context interface { + context.Context + + // Must return epoch number to select the values. + Epoch() uint64 +} + +// Writer describes the interface for storing reputation.Trust values. +// +// This interface is provided by both local storage +// of values and remote (wrappers over the RPC). +type Writer interface { + // Write performs a write operation of reputation.Trust value + // and returns any error encountered. + // + // All values after the Close call must be flushed to the + // physical target. Implementations can cache values before + // Close operation. + // + // Put must not be called after Close. + Write(reputation.Trust) error + + // Close exits with method-providing Writer. + // + // All cached values must be flushed before + // the Close's return. + // + // Methods must not be called after Close. + io.Closer +} + +// WriterProvider is a group of methods provided +// by entity which generates keepers of +// reputation.Trust values. +type WriterProvider interface { + // InitWriter should return an initialized Writer. + // + // Initialization problems are reported via error. + // If no error was returned, then the Writer must not be nil. + // + // Implementations can have different logic for different + // contexts, so specific ones may document their own behavior. + InitWriter(Context) (Writer, error) +} + +// Iterator is a group of methods provided by entity +// which can iterate over a group of reputation.Trust values. +type Iterator interface { + // Iterate must start an iterator over all trust values. + // For each value should call a handler, the error + // of which should be directly returned from the method. + // + // Internal failures of the iterator are also signaled via + // an error. After a successful call to the last value + // handler, nil should be returned. + Iterate(reputation.TrustHandler) error +} + +// IteratorProvider is a group of methods provided +// by entity which generates iterators over +// reputation.Trust values. +type IteratorProvider interface { + // InitIterator should return an initialized Iterator + // that iterates over values from IteratorContext.Epoch() epoch. + // + // Initialization problems are reported via error. + // If no error was returned, then the Iterator must not be nil. + // + // Implementations can have different logic for different + // contexts, so specific ones may document their own behavior. + InitIterator(Context) (Iterator, error) +} diff --git a/pkg/services/reputation/local/controller/opts.go b/pkg/services/reputation/local/controller/opts.go new file mode 100644 index 00000000..22d8bc34 --- /dev/null +++ b/pkg/services/reputation/local/controller/opts.go @@ -0,0 +1,30 @@ +package trustcontroller + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option sets an optional parameter of Controller. +type Option func(*options) + +type options struct { + log *logger.Logger +} + +func defaultOpts() *options { + return &options{ + log: zap.L(), + } +} + +// WithLogger returns option to specify logging component. +// +// Ignores nil values. +func WithLogger(l *logger.Logger) Option { + return func(o *options) { + if l != nil { + o.log = l + } + } +} diff --git a/pkg/services/reputation/local/controller/util.go b/pkg/services/reputation/local/controller/util.go new file mode 100644 index 00000000..38442d51 --- /dev/null +++ b/pkg/services/reputation/local/controller/util.go @@ -0,0 +1,30 @@ +package trustcontroller + +type storageWrapper struct { + w Writer + i Iterator +} + +func (s storageWrapper) InitIterator(Context) (Iterator, error) { + return s.i, nil +} + +func (s storageWrapper) InitWriter(Context) (Writer, error) { + return s.w, nil +} + +// SimpleIteratorProvider returns IteratorProvider that provides +// static context-independent Iterator. +func SimpleIteratorProvider(i Iterator) IteratorProvider { + return &storageWrapper{ + i: i, + } +} + +// SimpleWriterProvider returns WriterProvider that provides +// static context-independent Writer. +func SimpleWriterProvider(w Writer) WriterProvider { + return &storageWrapper{ + w: w, + } +}