From cc88320d6beb031b75cdba20dac5835dd98ea214 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 27 Jan 2021 22:00:35 +0300 Subject: [PATCH] [#328] container/load: Implement a metrics exchange controller Implement a component that connects the value stores of the used space of containers. Implement the Start/Stop operations on it, which will later become the application handlers of the corresponding events from the sidechain. The main task of the controller is to temporarily synchronize the stages of calculating the global estimate of the used space in the container. The details of the score calculation (the way of collecting / transmitting local scores, the final score formula and writing to the contract) are encapsulated in the dependency components, the controller is abstracted from them. Signed-off-by: Leonard Lyubich --- go.mod | 2 +- go.sum | Bin 58994 -> 59137 bytes .../announcement/load/controller/calls.go | 309 ++++++++++++++++++ .../load/controller/calls_test.go | 204 ++++++++++++ .../load/controller/controller.go | 94 ++++++ .../announcement/load/controller/deps.go | 88 +++++ .../announcement/load/controller/opts.go | 28 ++ .../announcement/load/controller/util.go | 9 + 8 files changed, 733 insertions(+), 1 deletion(-) create mode 100644 pkg/services/container/announcement/load/controller/calls.go create mode 100644 pkg/services/container/announcement/load/controller/calls_test.go create mode 100644 pkg/services/container/announcement/load/controller/controller.go create mode 100644 pkg/services/container/announcement/load/controller/deps.go create mode 100644 pkg/services/container/announcement/load/controller/opts.go create mode 100644 pkg/services/container/announcement/load/controller/util.go diff --git a/go.mod b/go.mod index 119d0e9e..0eeb93a0 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.92.0 - github.com/nspcc-dev/neofs-api-go v1.22.2 + github.com/nspcc-dev/neofs-api-go v1.22.3-0.20210127171042-f654094edb2e github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index 85145a7ef5113130553a92660d05c31a72c9753a..01e4f9f63d899a89cd909cef59b6b1438e2c4523 100644 GIT binary patch delta 190 zcmex#hPm+^^M)KNli&IN(Gsl zm*|<2?`W1BUhHC#QxcFL?q66Km3m?brNR<;16(Pqb-162T|;T@y^ diff --git a/pkg/services/container/announcement/load/controller/calls.go b/pkg/services/container/announcement/load/controller/calls.go new file mode 100644 index 00000000..31beb860 --- /dev/null +++ b/pkg/services/container/announcement/load/controller/calls.go @@ -0,0 +1,309 @@ +package loadcontroller + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// StartPrm groups the required parameters of the Controller.Start method. +type StartPrm struct { + // Epoch number by which you want to select + // the values of the used space of containers. + Epoch uint64 +} + +type commonContext struct { + epoch uint64 + + ctrl *Controller + + log *logger.Logger + + ctx context.Context +} + +type announceContext struct { + commonContext +} + +// Start starts the processing of UsedSpaceAnnouncement values. +// +// Single Start operation overtakes all data from LocalMetrics to +// LocalAnnouncementTarget (Controller's parameters). +// No filter by epoch is used for the iterator, since it is expected +// that the source of metrics does not track the change of epochs. +// +// Each call acquires an announcement context for an Epoch parameter. +// At the very end of the operation, the context is released. +func (c *Controller) Start(prm StartPrm) { + // acquire announcement + execCtx := c.acquireAnnouncement(prm) + if execCtx == nil { + return + } + + // finally stop and free the announcement + defer execCtx.freeAnnouncement() + + // announce local values + execCtx.announce() +} + +func (c *announceContext) announce() { + c.log.Debug("starting to announce the values of the metrics") + + var ( + metricsIterator Iterator + err error + ) + + // initialize iterator over locally collected metrics + metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator(c.ctx) + if err != nil { + c.log.Debug("could not initialize iterator over locally collected metrics", + zap.String("error", err.Error()), + ) + + return + } + + // initialize target of local announcements + targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(c.ctx) + if err != nil { + c.log.Debug("could not initialize announcement accumulator", + zap.String("error", err.Error()), + ) + + return + } + + // iterate over all collected metrics and write them to the target + err = metricsIterator.Iterate( + func(container.UsedSpaceAnnouncement) bool { + return true // local metrics don't know about epochs + }, + func(a container.UsedSpaceAnnouncement) error { + a.SetEpoch(c.epoch) // set epoch explicitly + return targetWriter.Put(a) + }, + ) + if err != nil { + c.log.Debug("iterator over locally collected metrics aborted", + zap.String("error", err.Error()), + ) + + return + } + + // finish writing + err = targetWriter.Close() + if err != nil { + c.log.Debug("could not finish writing local announcements", + zap.String("error", err.Error()), + ) + + return + } + + c.log.Debug("announcement successfully finished") +} + +func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext { + var ctx context.Context + + c.announceMtx.Lock() + + { + if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil { + ctx, cancel = context.WithCancel(context.Background()) + c.mAnnounceCtx[prm.Epoch] = cancel + } + } + + c.announceMtx.Unlock() + + log := c.opts.log.With( + zap.Uint64("epoch", prm.Epoch), + ) + + if ctx == nil { + log.Debug("announcement is already started") + return nil + } + + return &announceContext{ + commonContext: commonContext{ + epoch: prm.Epoch, + ctrl: c, + log: log, + ctx: ctx, + }, + } +} + +func (c *commonContext) freeAnnouncement() { + var stopped bool + + c.ctrl.announceMtx.Lock() + + { + var cancel context.CancelFunc + + cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch] + + if stopped { + cancel() + delete(c.ctrl.mAnnounceCtx, c.epoch) + } + } + + c.ctrl.announceMtx.Unlock() + + if stopped { + c.log.Debug("announcement successfully interrupted") + } else { + c.log.Debug("announcement is not started or already interrupted") + } +} + +// StopPrm groups the required parameters of the Controller.Stop method. +type StopPrm struct { + // Epoch number the analysis of the values of which must be interrupted. + Epoch uint64 +} + +type stopContext struct { + commonContext +} + +// Stop interrupts the processing of UsedSpaceAnnouncement values. +// +// Single Stop operation releases an announcement context and overtakes +// all data from AnnouncementAccumulator to ResultReceiver (Controller's +// parameters). Only values for the specified Epoch parameter are processed. +// +// Each call acquires a report context for an Epoch parameter. +// At the very end of the operation, the context is released. +func (c *Controller) Stop(prm StopPrm) { + execCtx := c.acquireReport(prm) + if execCtx == nil { + return + } + + // finally stop and free reporting + defer execCtx.freeReport() + + // interrupt announcement + execCtx.freeAnnouncement() + + // report the estimations + execCtx.report() +} + +func (c *Controller) acquireReport(prm StopPrm) *stopContext { + var ctx context.Context + + c.reportMtx.Lock() + + { + if cancel := c.mReportCtx[prm.Epoch]; cancel == nil { + ctx, cancel = context.WithCancel(context.Background()) + c.mReportCtx[prm.Epoch] = cancel + } + } + + c.reportMtx.Unlock() + + log := c.opts.log.With( + zap.Uint64("epoch", prm.Epoch), + ) + + if ctx == nil { + log.Debug("report is already started") + return nil + } + + return &stopContext{ + commonContext: commonContext{ + epoch: prm.Epoch, + ctrl: c, + log: log, + }, + } +} + +func (c *commonContext) freeReport() { + var stopped bool + + c.ctrl.reportMtx.Lock() + + { + var cancel context.CancelFunc + + cancel, stopped = c.ctrl.mReportCtx[c.epoch] + + if stopped { + cancel() + delete(c.ctrl.mReportCtx, c.epoch) + } + } + + c.ctrl.reportMtx.Unlock() + + if stopped { + c.log.Debug("announcement successfully interrupted") + } else { + c.log.Debug("announcement is not started or already interrupted") + } +} + +func (c *stopContext) report() { + var ( + localIterator Iterator + err error + ) + + // initialize iterator over locally accumulated announcements + localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator(c.ctx) + if err != nil { + c.log.Debug("could not initialize iterator over locally accumulated announcements", + zap.String("error", err.Error()), + ) + + return + } + + // initialize final destination of load estimations + resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(c.ctx) + if err != nil { + c.log.Debug("could not initialize result target", + zap.String("error", err.Error()), + ) + + return + } + + // iterate over all accumulated announcements and write them to the target + err = localIterator.Iterate( + usedSpaceFilterEpochEQ(c.epoch), + resultWriter.Put, + ) + if err != nil { + c.log.Debug("iterator over local announcements aborted", + zap.String("error", err.Error()), + ) + + return + } + + // finish writing + err = resultWriter.Close() + if err != nil { + c.log.Debug("could not finish writing load estimations", + zap.String("error", err.Error()), + ) + } +} diff --git a/pkg/services/container/announcement/load/controller/calls_test.go b/pkg/services/container/announcement/load/controller/calls_test.go new file mode 100644 index 00000000..8916610f --- /dev/null +++ b/pkg/services/container/announcement/load/controller/calls_test.go @@ -0,0 +1,204 @@ +package loadcontroller_test + +import ( + "context" + "crypto/sha256" + "math/rand" + "sync" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller" + "github.com/stretchr/testify/require" +) + +type testAnnouncementStorage struct { + w loadcontroller.Writer + + i loadcontroller.Iterator + + mtx sync.RWMutex + + m map[uint64][]container.UsedSpaceAnnouncement +} + +func newTestStorage() *testAnnouncementStorage { + return &testAnnouncementStorage{ + m: make(map[uint64][]container.UsedSpaceAnnouncement), + } +} + +func (s *testAnnouncementStorage) InitIterator(context.Context) (loadcontroller.Iterator, error) { + if s.i != nil { + return s.i, nil + } + + return s, nil +} + +func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error { + s.mtx.RLock() + defer s.mtx.RUnlock() + + for _, v := range s.m { + for _, a := range v { + if f(a) { + if err := h(a); err != nil { + return err + } + } + } + } + + return nil +} + +func (s *testAnnouncementStorage) InitWriter(context.Context) (loadcontroller.Writer, error) { + if s.w != nil { + return s.w, nil + } + + return s, nil +} + +func (s *testAnnouncementStorage) Put(v container.UsedSpaceAnnouncement) error { + s.mtx.Lock() + s.m[v.Epoch()] = append(s.m[v.Epoch()], v) + s.mtx.Unlock() + + return nil +} + +func (s *testAnnouncementStorage) Close() error { + return nil +} + +func randCID() *container.ID { + h := [sha256.Size]byte{} + + rand.Read(h[:]) + + id := container.NewID() + id.SetSHA256(h) + + return id +} + +func randAnnouncement() container.UsedSpaceAnnouncement { + a := container.NewAnnouncement() + a.SetContainerID(randCID()) + a.SetUsedSpace(rand.Uint64()) + + return *a +} + +func TestSimpleScenario(t *testing.T) { + // create storage to write final estimations + resultStorage := newTestStorage() + + // create storages to accumulate announcements + accumulatingStorageN2 := newTestStorage() + + // create storage of local metrics + localStorageN1 := newTestStorage() + localStorageN2 := newTestStorage() + + // create 2 controllers: 1st writes announcements to 2nd, 2nd directly to final destination + ctrlN1 := loadcontroller.New(loadcontroller.Prm{ + LocalMetrics: localStorageN1, + AnnouncementAccumulator: newTestStorage(), + LocalAnnouncementTarget: &testAnnouncementStorage{ + w: accumulatingStorageN2, + }, + ResultReceiver: resultStorage, + }) + + ctrlN2 := loadcontroller.New(loadcontroller.Prm{ + LocalMetrics: localStorageN2, + AnnouncementAccumulator: accumulatingStorageN2, + LocalAnnouncementTarget: &testAnnouncementStorage{ + w: resultStorage, + }, + ResultReceiver: resultStorage, + }) + + const processEpoch uint64 = 10 + + const goodNum = 4 + + // create 2 random values for processing epoch and 1 for some different + announces := make([]container.UsedSpaceAnnouncement, 0, goodNum) + + for i := 0; i < goodNum; i++ { + a := randAnnouncement() + a.SetEpoch(processEpoch) + + announces = append(announces, a) + } + + // store one half of "good" announcements to 1st metrics storage, another - to 2nd + // and "bad" to both + for i := 0; i < goodNum/2; i++ { + require.NoError(t, localStorageN1.Put(announces[i])) + } + + for i := goodNum / 2; i < goodNum; i++ { + require.NoError(t, localStorageN2.Put(announces[i])) + } + + wg := new(sync.WaitGroup) + wg.Add(2) + + startPrm := loadcontroller.StartPrm{ + Epoch: processEpoch, + } + + // start both controllers + go func() { + ctrlN1.Start(startPrm) + wg.Done() + }() + + go func() { + ctrlN2.Start(startPrm) + wg.Done() + }() + + wg.Wait() + wg.Add(2) + + stopPrm := loadcontroller.StopPrm{ + Epoch: processEpoch, + } + + // stop both controllers + go func() { + ctrlN1.Stop(stopPrm) + wg.Done() + }() + + go func() { + ctrlN2.Stop(stopPrm) + wg.Done() + }() + + wg.Wait() + + // result target should contain all "good" announcements and shoult not container the "bad" one + var res []container.UsedSpaceAnnouncement + + err := resultStorage.Iterate( + func(a container.UsedSpaceAnnouncement) bool { + return true + }, + func(a container.UsedSpaceAnnouncement) error { + res = append(res, a) + return nil + }, + ) + require.NoError(t, err) + + for i := range announces { + require.Contains(t, res, announces[i]) + } +} diff --git a/pkg/services/container/announcement/load/controller/controller.go b/pkg/services/container/announcement/load/controller/controller.go new file mode 100644 index 00000000..f5f3391f --- /dev/null +++ b/pkg/services/container/announcement/load/controller/controller.go @@ -0,0 +1,94 @@ +package loadcontroller + +import ( + "context" + "fmt" + "sync" +) + +// Prm groups the required parameters of the Controller's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type Prm struct { + // Iterator over the used space values of the containers + // collected by the node locally. + LocalMetrics IteratorProvider + + // Place of recording the local values of + // the used space of containers. + LocalAnnouncementTarget WriterProvider + + // Iterator over the summarized used space scores + // from the various network participants. + AnnouncementAccumulator IteratorProvider + + // Place of recording the final estimates of + // the used space of containers. + ResultReceiver WriterProvider +} + +// Controller represents main handler for starting +// and interrupting container volume estimation. +// +// It binds the interfaces of the local value stores +// to the target storage points. Controller is abstracted +// from the internal storage device and the network location +// of the connecting components. At its core, it is a +// high-level start-stop trigger for calculations. +// +// For correct operation, the controller must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// the constructor is immediately ready to work through +// API of external control of calculations and data transfer. +type Controller struct { + prm Prm + + opts *options + + announceMtx sync.Mutex + mAnnounceCtx map[uint64]context.CancelFunc + + reportMtx sync.Mutex + mReportCtx map[uint64]context.CancelFunc +} + +const invalidPrmValFmt = "invalid parameter %s (%T):%v" + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) +} + +// New creates a new instance of the Controller. +// +// Panics if at least one value of the parameters is invalid. +// +// The created Controller does not require additional +// initialization and is completely ready for work +func New(prm Prm, opts ...Option) *Controller { + switch { + case prm.LocalMetrics == nil: + panicOnPrmValue("LocalMetrics", prm.LocalMetrics) + case prm.AnnouncementAccumulator == nil: + panicOnPrmValue("AnnouncementAccumulator", prm.AnnouncementAccumulator) + case prm.LocalAnnouncementTarget == nil: + panicOnPrmValue("LocalAnnouncementTarget", prm.LocalAnnouncementTarget) + case prm.ResultReceiver == nil: + panicOnPrmValue("ResultReceiver", prm.ResultReceiver) + } + + o := defaultOpts() + + for _, opt := range opts { + opt(o) + } + + return &Controller{ + prm: prm, + opts: o, + mAnnounceCtx: make(map[uint64]context.CancelFunc), + mReportCtx: make(map[uint64]context.CancelFunc), + } +} diff --git a/pkg/services/container/announcement/load/controller/deps.go b/pkg/services/container/announcement/load/controller/deps.go new file mode 100644 index 00000000..0a72850d --- /dev/null +++ b/pkg/services/container/announcement/load/controller/deps.go @@ -0,0 +1,88 @@ +package loadcontroller + +import ( + "context" + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" +) + +// UsedSpaceHandler describes the signature of the UsedSpaceAnnouncement +// value handling function. +// +// Termination of processing without failures is usually signaled +// with a zero error, while a specific value may describe the reason +// for failure. +type UsedSpaceHandler func(container.UsedSpaceAnnouncement) error + +// UsedSpaceFilter describes the signature of the function for +// checking whether a value meets a certain criterion. +// +// Return of true means conformity, false - vice versa. +type UsedSpaceFilter func(container.UsedSpaceAnnouncement) bool + +// Iterator is a group of methods provided by entity +// which can iterate over a group of UsedSpaceAnnouncement values. +type Iterator interface { + // Iterate must start an iterator over values that + // meet the filter criterion (returns true). + // For each such value should call a handler, the error + // of which should be directly returned from the method. + // + // Internal failures of the iterator are also signaled via + // an error. After a successful call to the last value + // handler, nil should be returned. + Iterate(UsedSpaceFilter, UsedSpaceHandler) error +} + +// IteratorProvider is a group of methods provided +// by entity which generates iterators over +// UsedSpaceAnnouncement values. +type IteratorProvider interface { + // InitIterator should return an initialized Iterator. + // + // Initialization problems are reported via error. + // If no error was returned, then the Iterator must not be nil. + // + // Implementations can have different logic for different + // contexts, so specific ones may document their own behavior. + InitIterator(context.Context) (Iterator, error) +} + +// Writer describes the interface for storing UsedSpaceAnnouncement values. +// +// This interface is provided by both local storage +// of values and remote (wrappers over the RPC). +type Writer interface { + // Put performs a write operation of UsedSpaceAnnouncement value + // and returns any error encountered. + // + // All values after the Close call must be flushed to the + // physical target. Implementations can cache values before + // Close operation. + // + // Put must not be called after Close. + Put(container.UsedSpaceAnnouncement) error + + // Close exits with method-providing Writer. + // + // All cached values must be flushed before + // the Close's return. + // + // Methods must not be called after Close. + io.Closer +} + +// IteratorProvider is a group of methods provided +// by entity which generates keepers of +// UsedSpaceAnnouncement values. +type WriterProvider interface { + // InitWriter should return an initialized Writer. + // + // Initialization problems are reported via error. + // If no error was returned, then the Writer must not be nil. + // + // Implementations can have different logic for different + // contexts, so specific ones may document their own behavior. + InitWriter(context.Context) (Writer, error) +} diff --git a/pkg/services/container/announcement/load/controller/opts.go b/pkg/services/container/announcement/load/controller/opts.go new file mode 100644 index 00000000..408bfe15 --- /dev/null +++ b/pkg/services/container/announcement/load/controller/opts.go @@ -0,0 +1,28 @@ +package loadcontroller + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option sets an optional parameter of Controller. +type Option func(*options) + +type options struct { + log *logger.Logger +} + +func defaultOpts() *options { + return &options{ + log: zap.L(), + } +} + +// WithLogger returns option to specify logging component. +func WithLogger(l *logger.Logger) Option { + return func(o *options) { + if l != nil { + o.log = l + } + } +} diff --git a/pkg/services/container/announcement/load/controller/util.go b/pkg/services/container/announcement/load/controller/util.go new file mode 100644 index 00000000..df1ca703 --- /dev/null +++ b/pkg/services/container/announcement/load/controller/util.go @@ -0,0 +1,9 @@ +package loadcontroller + +import "github.com/nspcc-dev/neofs-api-go/pkg/container" + +func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter { + return func(a container.UsedSpaceAnnouncement) bool { + return a.Epoch() == epoch + } +}