[#328] container/load: Implement a metrics exchange controller

Implement a component that connects the value stores of the used space of
containers. Implement the Start/Stop operations on it, which will later
become the application handlers of the corresponding events from the
sidechain. The main task of the controller is to temporarily synchronize the
stages of calculating the global estimate of the used space in the
container. The details of the score calculation (the way of collecting /
transmitting local scores, the final score formula and writing to the
contract) are encapsulated in the dependency components, the controller is
abstracted from them.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-01-27 22:00:35 +03:00 committed by Leonard Lyubich
parent 5b550bff22
commit cc88320d6b
8 changed files with 733 additions and 1 deletions

2
go.mod
View file

@ -16,7 +16,7 @@ require (
github.com/multiformats/go-multihash v0.0.13 // indirect
github.com/nspcc-dev/hrw v1.0.9
github.com/nspcc-dev/neo-go v0.92.0
github.com/nspcc-dev/neofs-api-go v1.22.2
github.com/nspcc-dev/neofs-api-go v1.22.3-0.20210127171042-f654094edb2e
github.com/nspcc-dev/neofs-crypto v0.3.0
github.com/nspcc-dev/tzhash v1.4.0
github.com/panjf2000/ants/v2 v2.3.0

BIN
go.sum

Binary file not shown.

View file

@ -0,0 +1,309 @@
package loadcontroller
import (
"context"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
// StartPrm groups the required parameters of the Controller.Start method.
type StartPrm struct {
// Epoch number by which you want to select
// the values of the used space of containers.
Epoch uint64
}
type commonContext struct {
epoch uint64
ctrl *Controller
log *logger.Logger
ctx context.Context
}
type announceContext struct {
commonContext
}
// Start starts the processing of UsedSpaceAnnouncement values.
//
// Single Start operation overtakes all data from LocalMetrics to
// LocalAnnouncementTarget (Controller's parameters).
// No filter by epoch is used for the iterator, since it is expected
// that the source of metrics does not track the change of epochs.
//
// Each call acquires an announcement context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Start(prm StartPrm) {
// acquire announcement
execCtx := c.acquireAnnouncement(prm)
if execCtx == nil {
return
}
// finally stop and free the announcement
defer execCtx.freeAnnouncement()
// announce local values
execCtx.announce()
}
func (c *announceContext) announce() {
c.log.Debug("starting to announce the values of the metrics")
var (
metricsIterator Iterator
err error
)
// initialize iterator over locally collected metrics
metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator(c.ctx)
if err != nil {
c.log.Debug("could not initialize iterator over locally collected metrics",
zap.String("error", err.Error()),
)
return
}
// initialize target of local announcements
targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(c.ctx)
if err != nil {
c.log.Debug("could not initialize announcement accumulator",
zap.String("error", err.Error()),
)
return
}
// iterate over all collected metrics and write them to the target
err = metricsIterator.Iterate(
func(container.UsedSpaceAnnouncement) bool {
return true // local metrics don't know about epochs
},
func(a container.UsedSpaceAnnouncement) error {
a.SetEpoch(c.epoch) // set epoch explicitly
return targetWriter.Put(a)
},
)
if err != nil {
c.log.Debug("iterator over locally collected metrics aborted",
zap.String("error", err.Error()),
)
return
}
// finish writing
err = targetWriter.Close()
if err != nil {
c.log.Debug("could not finish writing local announcements",
zap.String("error", err.Error()),
)
return
}
c.log.Debug("announcement successfully finished")
}
func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext {
var ctx context.Context
c.announceMtx.Lock()
{
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(context.Background())
c.mAnnounceCtx[prm.Epoch] = cancel
}
}
c.announceMtx.Unlock()
log := c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)
if ctx == nil {
log.Debug("announcement is already started")
return nil
}
return &announceContext{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
ctx: ctx,
},
}
}
func (c *commonContext) freeAnnouncement() {
var stopped bool
c.ctrl.announceMtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch]
if stopped {
cancel()
delete(c.ctrl.mAnnounceCtx, c.epoch)
}
}
c.ctrl.announceMtx.Unlock()
if stopped {
c.log.Debug("announcement successfully interrupted")
} else {
c.log.Debug("announcement is not started or already interrupted")
}
}
// StopPrm groups the required parameters of the Controller.Stop method.
type StopPrm struct {
// Epoch number the analysis of the values of which must be interrupted.
Epoch uint64
}
type stopContext struct {
commonContext
}
// Stop interrupts the processing of UsedSpaceAnnouncement values.
//
// Single Stop operation releases an announcement context and overtakes
// all data from AnnouncementAccumulator to ResultReceiver (Controller's
// parameters). Only values for the specified Epoch parameter are processed.
//
// Each call acquires a report context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Stop(prm StopPrm) {
execCtx := c.acquireReport(prm)
if execCtx == nil {
return
}
// finally stop and free reporting
defer execCtx.freeReport()
// interrupt announcement
execCtx.freeAnnouncement()
// report the estimations
execCtx.report()
}
func (c *Controller) acquireReport(prm StopPrm) *stopContext {
var ctx context.Context
c.reportMtx.Lock()
{
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(context.Background())
c.mReportCtx[prm.Epoch] = cancel
}
}
c.reportMtx.Unlock()
log := c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)
if ctx == nil {
log.Debug("report is already started")
return nil
}
return &stopContext{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
},
}
}
func (c *commonContext) freeReport() {
var stopped bool
c.ctrl.reportMtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.ctrl.mReportCtx[c.epoch]
if stopped {
cancel()
delete(c.ctrl.mReportCtx, c.epoch)
}
}
c.ctrl.reportMtx.Unlock()
if stopped {
c.log.Debug("announcement successfully interrupted")
} else {
c.log.Debug("announcement is not started or already interrupted")
}
}
func (c *stopContext) report() {
var (
localIterator Iterator
err error
)
// initialize iterator over locally accumulated announcements
localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator(c.ctx)
if err != nil {
c.log.Debug("could not initialize iterator over locally accumulated announcements",
zap.String("error", err.Error()),
)
return
}
// initialize final destination of load estimations
resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(c.ctx)
if err != nil {
c.log.Debug("could not initialize result target",
zap.String("error", err.Error()),
)
return
}
// iterate over all accumulated announcements and write them to the target
err = localIterator.Iterate(
usedSpaceFilterEpochEQ(c.epoch),
resultWriter.Put,
)
if err != nil {
c.log.Debug("iterator over local announcements aborted",
zap.String("error", err.Error()),
)
return
}
// finish writing
err = resultWriter.Close()
if err != nil {
c.log.Debug("could not finish writing load estimations",
zap.String("error", err.Error()),
)
}
}

View file

@ -0,0 +1,204 @@
package loadcontroller_test
import (
"context"
"crypto/sha256"
"math/rand"
"sync"
"testing"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
"github.com/stretchr/testify/require"
)
type testAnnouncementStorage struct {
w loadcontroller.Writer
i loadcontroller.Iterator
mtx sync.RWMutex
m map[uint64][]container.UsedSpaceAnnouncement
}
func newTestStorage() *testAnnouncementStorage {
return &testAnnouncementStorage{
m: make(map[uint64][]container.UsedSpaceAnnouncement),
}
}
func (s *testAnnouncementStorage) InitIterator(context.Context) (loadcontroller.Iterator, error) {
if s.i != nil {
return s.i, nil
}
return s, nil
}
func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, v := range s.m {
for _, a := range v {
if f(a) {
if err := h(a); err != nil {
return err
}
}
}
}
return nil
}
func (s *testAnnouncementStorage) InitWriter(context.Context) (loadcontroller.Writer, error) {
if s.w != nil {
return s.w, nil
}
return s, nil
}
func (s *testAnnouncementStorage) Put(v container.UsedSpaceAnnouncement) error {
s.mtx.Lock()
s.m[v.Epoch()] = append(s.m[v.Epoch()], v)
s.mtx.Unlock()
return nil
}
func (s *testAnnouncementStorage) Close() error {
return nil
}
func randCID() *container.ID {
h := [sha256.Size]byte{}
rand.Read(h[:])
id := container.NewID()
id.SetSHA256(h)
return id
}
func randAnnouncement() container.UsedSpaceAnnouncement {
a := container.NewAnnouncement()
a.SetContainerID(randCID())
a.SetUsedSpace(rand.Uint64())
return *a
}
func TestSimpleScenario(t *testing.T) {
// create storage to write final estimations
resultStorage := newTestStorage()
// create storages to accumulate announcements
accumulatingStorageN2 := newTestStorage()
// create storage of local metrics
localStorageN1 := newTestStorage()
localStorageN2 := newTestStorage()
// create 2 controllers: 1st writes announcements to 2nd, 2nd directly to final destination
ctrlN1 := loadcontroller.New(loadcontroller.Prm{
LocalMetrics: localStorageN1,
AnnouncementAccumulator: newTestStorage(),
LocalAnnouncementTarget: &testAnnouncementStorage{
w: accumulatingStorageN2,
},
ResultReceiver: resultStorage,
})
ctrlN2 := loadcontroller.New(loadcontroller.Prm{
LocalMetrics: localStorageN2,
AnnouncementAccumulator: accumulatingStorageN2,
LocalAnnouncementTarget: &testAnnouncementStorage{
w: resultStorage,
},
ResultReceiver: resultStorage,
})
const processEpoch uint64 = 10
const goodNum = 4
// create 2 random values for processing epoch and 1 for some different
announces := make([]container.UsedSpaceAnnouncement, 0, goodNum)
for i := 0; i < goodNum; i++ {
a := randAnnouncement()
a.SetEpoch(processEpoch)
announces = append(announces, a)
}
// store one half of "good" announcements to 1st metrics storage, another - to 2nd
// and "bad" to both
for i := 0; i < goodNum/2; i++ {
require.NoError(t, localStorageN1.Put(announces[i]))
}
for i := goodNum / 2; i < goodNum; i++ {
require.NoError(t, localStorageN2.Put(announces[i]))
}
wg := new(sync.WaitGroup)
wg.Add(2)
startPrm := loadcontroller.StartPrm{
Epoch: processEpoch,
}
// start both controllers
go func() {
ctrlN1.Start(startPrm)
wg.Done()
}()
go func() {
ctrlN2.Start(startPrm)
wg.Done()
}()
wg.Wait()
wg.Add(2)
stopPrm := loadcontroller.StopPrm{
Epoch: processEpoch,
}
// stop both controllers
go func() {
ctrlN1.Stop(stopPrm)
wg.Done()
}()
go func() {
ctrlN2.Stop(stopPrm)
wg.Done()
}()
wg.Wait()
// result target should contain all "good" announcements and shoult not container the "bad" one
var res []container.UsedSpaceAnnouncement
err := resultStorage.Iterate(
func(a container.UsedSpaceAnnouncement) bool {
return true
},
func(a container.UsedSpaceAnnouncement) error {
res = append(res, a)
return nil
},
)
require.NoError(t, err)
for i := range announces {
require.Contains(t, res, announces[i])
}
}

View file

@ -0,0 +1,94 @@
package loadcontroller
import (
"context"
"fmt"
"sync"
)
// Prm groups the required parameters of the Controller's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Iterator over the used space values of the containers
// collected by the node locally.
LocalMetrics IteratorProvider
// Place of recording the local values of
// the used space of containers.
LocalAnnouncementTarget WriterProvider
// Iterator over the summarized used space scores
// from the various network participants.
AnnouncementAccumulator IteratorProvider
// Place of recording the final estimates of
// the used space of containers.
ResultReceiver WriterProvider
}
// Controller represents main handler for starting
// and interrupting container volume estimation.
//
// It binds the interfaces of the local value stores
// to the target storage points. Controller is abstracted
// from the internal storage device and the network location
// of the connecting components. At its core, it is a
// high-level start-stop trigger for calculations.
//
// For correct operation, the controller must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the constructor is immediately ready to work through
// API of external control of calculations and data transfer.
type Controller struct {
prm Prm
opts *options
announceMtx sync.Mutex
mAnnounceCtx map[uint64]context.CancelFunc
reportMtx sync.Mutex
mReportCtx map[uint64]context.CancelFunc
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v interface{}) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Controller.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Controller does not require additional
// initialization and is completely ready for work
func New(prm Prm, opts ...Option) *Controller {
switch {
case prm.LocalMetrics == nil:
panicOnPrmValue("LocalMetrics", prm.LocalMetrics)
case prm.AnnouncementAccumulator == nil:
panicOnPrmValue("AnnouncementAccumulator", prm.AnnouncementAccumulator)
case prm.LocalAnnouncementTarget == nil:
panicOnPrmValue("LocalAnnouncementTarget", prm.LocalAnnouncementTarget)
case prm.ResultReceiver == nil:
panicOnPrmValue("ResultReceiver", prm.ResultReceiver)
}
o := defaultOpts()
for _, opt := range opts {
opt(o)
}
return &Controller{
prm: prm,
opts: o,
mAnnounceCtx: make(map[uint64]context.CancelFunc),
mReportCtx: make(map[uint64]context.CancelFunc),
}
}

View file

@ -0,0 +1,88 @@
package loadcontroller
import (
"context"
"io"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
)
// UsedSpaceHandler describes the signature of the UsedSpaceAnnouncement
// value handling function.
//
// Termination of processing without failures is usually signaled
// with a zero error, while a specific value may describe the reason
// for failure.
type UsedSpaceHandler func(container.UsedSpaceAnnouncement) error
// UsedSpaceFilter describes the signature of the function for
// checking whether a value meets a certain criterion.
//
// Return of true means conformity, false - vice versa.
type UsedSpaceFilter func(container.UsedSpaceAnnouncement) bool
// Iterator is a group of methods provided by entity
// which can iterate over a group of UsedSpaceAnnouncement values.
type Iterator interface {
// Iterate must start an iterator over values that
// meet the filter criterion (returns true).
// For each such value should call a handler, the error
// of which should be directly returned from the method.
//
// Internal failures of the iterator are also signaled via
// an error. After a successful call to the last value
// handler, nil should be returned.
Iterate(UsedSpaceFilter, UsedSpaceHandler) error
}
// IteratorProvider is a group of methods provided
// by entity which generates iterators over
// UsedSpaceAnnouncement values.
type IteratorProvider interface {
// InitIterator should return an initialized Iterator.
//
// Initialization problems are reported via error.
// If no error was returned, then the Iterator must not be nil.
//
// Implementations can have different logic for different
// contexts, so specific ones may document their own behavior.
InitIterator(context.Context) (Iterator, error)
}
// Writer describes the interface for storing UsedSpaceAnnouncement values.
//
// This interface is provided by both local storage
// of values and remote (wrappers over the RPC).
type Writer interface {
// Put performs a write operation of UsedSpaceAnnouncement value
// and returns any error encountered.
//
// All values after the Close call must be flushed to the
// physical target. Implementations can cache values before
// Close operation.
//
// Put must not be called after Close.
Put(container.UsedSpaceAnnouncement) error
// Close exits with method-providing Writer.
//
// All cached values must be flushed before
// the Close's return.
//
// Methods must not be called after Close.
io.Closer
}
// IteratorProvider is a group of methods provided
// by entity which generates keepers of
// UsedSpaceAnnouncement values.
type WriterProvider interface {
// InitWriter should return an initialized Writer.
//
// Initialization problems are reported via error.
// If no error was returned, then the Writer must not be nil.
//
// Implementations can have different logic for different
// contexts, so specific ones may document their own behavior.
InitWriter(context.Context) (Writer, error)
}

View file

@ -0,0 +1,28 @@
package loadcontroller
import (
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Controller.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: zap.L(),
}
}
// WithLogger returns option to specify logging component.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -0,0 +1,9 @@
package loadcontroller
import "github.com/nspcc-dev/neofs-api-go/pkg/container"
func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter {
return func(a container.UsedSpaceAnnouncement) bool {
return a.Epoch() == epoch
}
}