diff --git a/go.mod b/go.mod index 119d0e9e..0eeb93a0 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.92.0 - github.com/nspcc-dev/neofs-api-go v1.22.2 + github.com/nspcc-dev/neofs-api-go v1.22.3-0.20210127171042-f654094edb2e github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index 85145a7e..01e4f9f6 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-redis/redis v6.10.2+incompatible h1:SLbqrO/Ik1nhXA5/cbEs1P5MUBo1Qq4ihlNfGnnipPw= github.com/go-redis/redis v6.10.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= @@ -279,8 +280,8 @@ github.com/nspcc-dev/neo-go v0.73.1-pre.0.20200303142215-f5a1b928ce09/go.mod h1: github.com/nspcc-dev/neo-go v0.91.0/go.mod h1:G6HdOWvzQ6tlvFdvFSN/PgCzLPN/X/X4d5hTjFRUDcc= github.com/nspcc-dev/neo-go v0.92.0 h1:iKHpKLzpwE6RSXnQb0BoYWi+H1P/hNyQbMpPG0mY57Q= github.com/nspcc-dev/neo-go v0.92.0/go.mod h1:L7PyTzjK1j/PCAxvbKiVFkCMZDvsv82JbXlPxaH1t0Q= -github.com/nspcc-dev/neofs-api-go v1.22.2 h1:kNKM7wQMZ23nfFCUgBdkaYaszu4bjgabaUR8nTYBYZQ= -github.com/nspcc-dev/neofs-api-go v1.22.2/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8= +github.com/nspcc-dev/neofs-api-go v1.22.3-0.20210127171042-f654094edb2e h1:naIdoA6cWsD8ltPoWOqqRO9Su86PQHTma18jlq7tHEo= +github.com/nspcc-dev/neofs-api-go v1.22.3-0.20210127171042-f654094edb2e/go.mod h1:G7dqincfdjBrAbL5nxVp82emF05fSVEqe59ICsoRDI8= github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9KAEAUQR7dMxZmNA= github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= diff --git a/pkg/services/container/announcement/load/controller/calls.go b/pkg/services/container/announcement/load/controller/calls.go new file mode 100644 index 00000000..31beb860 --- /dev/null +++ b/pkg/services/container/announcement/load/controller/calls.go @@ -0,0 +1,309 @@ +package loadcontroller + +import ( + "context" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// StartPrm groups the required parameters of the Controller.Start method. +type StartPrm struct { + // Epoch number by which you want to select + // the values of the used space of containers. + Epoch uint64 +} + +type commonContext struct { + epoch uint64 + + ctrl *Controller + + log *logger.Logger + + ctx context.Context +} + +type announceContext struct { + commonContext +} + +// Start starts the processing of UsedSpaceAnnouncement values. +// +// Single Start operation overtakes all data from LocalMetrics to +// LocalAnnouncementTarget (Controller's parameters). +// No filter by epoch is used for the iterator, since it is expected +// that the source of metrics does not track the change of epochs. +// +// Each call acquires an announcement context for an Epoch parameter. +// At the very end of the operation, the context is released. +func (c *Controller) Start(prm StartPrm) { + // acquire announcement + execCtx := c.acquireAnnouncement(prm) + if execCtx == nil { + return + } + + // finally stop and free the announcement + defer execCtx.freeAnnouncement() + + // announce local values + execCtx.announce() +} + +func (c *announceContext) announce() { + c.log.Debug("starting to announce the values of the metrics") + + var ( + metricsIterator Iterator + err error + ) + + // initialize iterator over locally collected metrics + metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator(c.ctx) + if err != nil { + c.log.Debug("could not initialize iterator over locally collected metrics", + zap.String("error", err.Error()), + ) + + return + } + + // initialize target of local announcements + targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(c.ctx) + if err != nil { + c.log.Debug("could not initialize announcement accumulator", + zap.String("error", err.Error()), + ) + + return + } + + // iterate over all collected metrics and write them to the target + err = metricsIterator.Iterate( + func(container.UsedSpaceAnnouncement) bool { + return true // local metrics don't know about epochs + }, + func(a container.UsedSpaceAnnouncement) error { + a.SetEpoch(c.epoch) // set epoch explicitly + return targetWriter.Put(a) + }, + ) + if err != nil { + c.log.Debug("iterator over locally collected metrics aborted", + zap.String("error", err.Error()), + ) + + return + } + + // finish writing + err = targetWriter.Close() + if err != nil { + c.log.Debug("could not finish writing local announcements", + zap.String("error", err.Error()), + ) + + return + } + + c.log.Debug("announcement successfully finished") +} + +func (c *Controller) acquireAnnouncement(prm StartPrm) *announceContext { + var ctx context.Context + + c.announceMtx.Lock() + + { + if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil { + ctx, cancel = context.WithCancel(context.Background()) + c.mAnnounceCtx[prm.Epoch] = cancel + } + } + + c.announceMtx.Unlock() + + log := c.opts.log.With( + zap.Uint64("epoch", prm.Epoch), + ) + + if ctx == nil { + log.Debug("announcement is already started") + return nil + } + + return &announceContext{ + commonContext: commonContext{ + epoch: prm.Epoch, + ctrl: c, + log: log, + ctx: ctx, + }, + } +} + +func (c *commonContext) freeAnnouncement() { + var stopped bool + + c.ctrl.announceMtx.Lock() + + { + var cancel context.CancelFunc + + cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch] + + if stopped { + cancel() + delete(c.ctrl.mAnnounceCtx, c.epoch) + } + } + + c.ctrl.announceMtx.Unlock() + + if stopped { + c.log.Debug("announcement successfully interrupted") + } else { + c.log.Debug("announcement is not started or already interrupted") + } +} + +// StopPrm groups the required parameters of the Controller.Stop method. +type StopPrm struct { + // Epoch number the analysis of the values of which must be interrupted. + Epoch uint64 +} + +type stopContext struct { + commonContext +} + +// Stop interrupts the processing of UsedSpaceAnnouncement values. +// +// Single Stop operation releases an announcement context and overtakes +// all data from AnnouncementAccumulator to ResultReceiver (Controller's +// parameters). Only values for the specified Epoch parameter are processed. +// +// Each call acquires a report context for an Epoch parameter. +// At the very end of the operation, the context is released. +func (c *Controller) Stop(prm StopPrm) { + execCtx := c.acquireReport(prm) + if execCtx == nil { + return + } + + // finally stop and free reporting + defer execCtx.freeReport() + + // interrupt announcement + execCtx.freeAnnouncement() + + // report the estimations + execCtx.report() +} + +func (c *Controller) acquireReport(prm StopPrm) *stopContext { + var ctx context.Context + + c.reportMtx.Lock() + + { + if cancel := c.mReportCtx[prm.Epoch]; cancel == nil { + ctx, cancel = context.WithCancel(context.Background()) + c.mReportCtx[prm.Epoch] = cancel + } + } + + c.reportMtx.Unlock() + + log := c.opts.log.With( + zap.Uint64("epoch", prm.Epoch), + ) + + if ctx == nil { + log.Debug("report is already started") + return nil + } + + return &stopContext{ + commonContext: commonContext{ + epoch: prm.Epoch, + ctrl: c, + log: log, + }, + } +} + +func (c *commonContext) freeReport() { + var stopped bool + + c.ctrl.reportMtx.Lock() + + { + var cancel context.CancelFunc + + cancel, stopped = c.ctrl.mReportCtx[c.epoch] + + if stopped { + cancel() + delete(c.ctrl.mReportCtx, c.epoch) + } + } + + c.ctrl.reportMtx.Unlock() + + if stopped { + c.log.Debug("announcement successfully interrupted") + } else { + c.log.Debug("announcement is not started or already interrupted") + } +} + +func (c *stopContext) report() { + var ( + localIterator Iterator + err error + ) + + // initialize iterator over locally accumulated announcements + localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator(c.ctx) + if err != nil { + c.log.Debug("could not initialize iterator over locally accumulated announcements", + zap.String("error", err.Error()), + ) + + return + } + + // initialize final destination of load estimations + resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(c.ctx) + if err != nil { + c.log.Debug("could not initialize result target", + zap.String("error", err.Error()), + ) + + return + } + + // iterate over all accumulated announcements and write them to the target + err = localIterator.Iterate( + usedSpaceFilterEpochEQ(c.epoch), + resultWriter.Put, + ) + if err != nil { + c.log.Debug("iterator over local announcements aborted", + zap.String("error", err.Error()), + ) + + return + } + + // finish writing + err = resultWriter.Close() + if err != nil { + c.log.Debug("could not finish writing load estimations", + zap.String("error", err.Error()), + ) + } +} diff --git a/pkg/services/container/announcement/load/controller/calls_test.go b/pkg/services/container/announcement/load/controller/calls_test.go new file mode 100644 index 00000000..8916610f --- /dev/null +++ b/pkg/services/container/announcement/load/controller/calls_test.go @@ -0,0 +1,204 @@ +package loadcontroller_test + +import ( + "context" + "crypto/sha256" + "math/rand" + "sync" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller" + "github.com/stretchr/testify/require" +) + +type testAnnouncementStorage struct { + w loadcontroller.Writer + + i loadcontroller.Iterator + + mtx sync.RWMutex + + m map[uint64][]container.UsedSpaceAnnouncement +} + +func newTestStorage() *testAnnouncementStorage { + return &testAnnouncementStorage{ + m: make(map[uint64][]container.UsedSpaceAnnouncement), + } +} + +func (s *testAnnouncementStorage) InitIterator(context.Context) (loadcontroller.Iterator, error) { + if s.i != nil { + return s.i, nil + } + + return s, nil +} + +func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error { + s.mtx.RLock() + defer s.mtx.RUnlock() + + for _, v := range s.m { + for _, a := range v { + if f(a) { + if err := h(a); err != nil { + return err + } + } + } + } + + return nil +} + +func (s *testAnnouncementStorage) InitWriter(context.Context) (loadcontroller.Writer, error) { + if s.w != nil { + return s.w, nil + } + + return s, nil +} + +func (s *testAnnouncementStorage) Put(v container.UsedSpaceAnnouncement) error { + s.mtx.Lock() + s.m[v.Epoch()] = append(s.m[v.Epoch()], v) + s.mtx.Unlock() + + return nil +} + +func (s *testAnnouncementStorage) Close() error { + return nil +} + +func randCID() *container.ID { + h := [sha256.Size]byte{} + + rand.Read(h[:]) + + id := container.NewID() + id.SetSHA256(h) + + return id +} + +func randAnnouncement() container.UsedSpaceAnnouncement { + a := container.NewAnnouncement() + a.SetContainerID(randCID()) + a.SetUsedSpace(rand.Uint64()) + + return *a +} + +func TestSimpleScenario(t *testing.T) { + // create storage to write final estimations + resultStorage := newTestStorage() + + // create storages to accumulate announcements + accumulatingStorageN2 := newTestStorage() + + // create storage of local metrics + localStorageN1 := newTestStorage() + localStorageN2 := newTestStorage() + + // create 2 controllers: 1st writes announcements to 2nd, 2nd directly to final destination + ctrlN1 := loadcontroller.New(loadcontroller.Prm{ + LocalMetrics: localStorageN1, + AnnouncementAccumulator: newTestStorage(), + LocalAnnouncementTarget: &testAnnouncementStorage{ + w: accumulatingStorageN2, + }, + ResultReceiver: resultStorage, + }) + + ctrlN2 := loadcontroller.New(loadcontroller.Prm{ + LocalMetrics: localStorageN2, + AnnouncementAccumulator: accumulatingStorageN2, + LocalAnnouncementTarget: &testAnnouncementStorage{ + w: resultStorage, + }, + ResultReceiver: resultStorage, + }) + + const processEpoch uint64 = 10 + + const goodNum = 4 + + // create 2 random values for processing epoch and 1 for some different + announces := make([]container.UsedSpaceAnnouncement, 0, goodNum) + + for i := 0; i < goodNum; i++ { + a := randAnnouncement() + a.SetEpoch(processEpoch) + + announces = append(announces, a) + } + + // store one half of "good" announcements to 1st metrics storage, another - to 2nd + // and "bad" to both + for i := 0; i < goodNum/2; i++ { + require.NoError(t, localStorageN1.Put(announces[i])) + } + + for i := goodNum / 2; i < goodNum; i++ { + require.NoError(t, localStorageN2.Put(announces[i])) + } + + wg := new(sync.WaitGroup) + wg.Add(2) + + startPrm := loadcontroller.StartPrm{ + Epoch: processEpoch, + } + + // start both controllers + go func() { + ctrlN1.Start(startPrm) + wg.Done() + }() + + go func() { + ctrlN2.Start(startPrm) + wg.Done() + }() + + wg.Wait() + wg.Add(2) + + stopPrm := loadcontroller.StopPrm{ + Epoch: processEpoch, + } + + // stop both controllers + go func() { + ctrlN1.Stop(stopPrm) + wg.Done() + }() + + go func() { + ctrlN2.Stop(stopPrm) + wg.Done() + }() + + wg.Wait() + + // result target should contain all "good" announcements and shoult not container the "bad" one + var res []container.UsedSpaceAnnouncement + + err := resultStorage.Iterate( + func(a container.UsedSpaceAnnouncement) bool { + return true + }, + func(a container.UsedSpaceAnnouncement) error { + res = append(res, a) + return nil + }, + ) + require.NoError(t, err) + + for i := range announces { + require.Contains(t, res, announces[i]) + } +} diff --git a/pkg/services/container/announcement/load/controller/controller.go b/pkg/services/container/announcement/load/controller/controller.go new file mode 100644 index 00000000..f5f3391f --- /dev/null +++ b/pkg/services/container/announcement/load/controller/controller.go @@ -0,0 +1,94 @@ +package loadcontroller + +import ( + "context" + "fmt" + "sync" +) + +// Prm groups the required parameters of the Controller's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type Prm struct { + // Iterator over the used space values of the containers + // collected by the node locally. + LocalMetrics IteratorProvider + + // Place of recording the local values of + // the used space of containers. + LocalAnnouncementTarget WriterProvider + + // Iterator over the summarized used space scores + // from the various network participants. + AnnouncementAccumulator IteratorProvider + + // Place of recording the final estimates of + // the used space of containers. + ResultReceiver WriterProvider +} + +// Controller represents main handler for starting +// and interrupting container volume estimation. +// +// It binds the interfaces of the local value stores +// to the target storage points. Controller is abstracted +// from the internal storage device and the network location +// of the connecting components. At its core, it is a +// high-level start-stop trigger for calculations. +// +// For correct operation, the controller must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// the constructor is immediately ready to work through +// API of external control of calculations and data transfer. +type Controller struct { + prm Prm + + opts *options + + announceMtx sync.Mutex + mAnnounceCtx map[uint64]context.CancelFunc + + reportMtx sync.Mutex + mReportCtx map[uint64]context.CancelFunc +} + +const invalidPrmValFmt = "invalid parameter %s (%T):%v" + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) +} + +// New creates a new instance of the Controller. +// +// Panics if at least one value of the parameters is invalid. +// +// The created Controller does not require additional +// initialization and is completely ready for work +func New(prm Prm, opts ...Option) *Controller { + switch { + case prm.LocalMetrics == nil: + panicOnPrmValue("LocalMetrics", prm.LocalMetrics) + case prm.AnnouncementAccumulator == nil: + panicOnPrmValue("AnnouncementAccumulator", prm.AnnouncementAccumulator) + case prm.LocalAnnouncementTarget == nil: + panicOnPrmValue("LocalAnnouncementTarget", prm.LocalAnnouncementTarget) + case prm.ResultReceiver == nil: + panicOnPrmValue("ResultReceiver", prm.ResultReceiver) + } + + o := defaultOpts() + + for _, opt := range opts { + opt(o) + } + + return &Controller{ + prm: prm, + opts: o, + mAnnounceCtx: make(map[uint64]context.CancelFunc), + mReportCtx: make(map[uint64]context.CancelFunc), + } +} diff --git a/pkg/services/container/announcement/load/controller/deps.go b/pkg/services/container/announcement/load/controller/deps.go new file mode 100644 index 00000000..0a72850d --- /dev/null +++ b/pkg/services/container/announcement/load/controller/deps.go @@ -0,0 +1,88 @@ +package loadcontroller + +import ( + "context" + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" +) + +// UsedSpaceHandler describes the signature of the UsedSpaceAnnouncement +// value handling function. +// +// Termination of processing without failures is usually signaled +// with a zero error, while a specific value may describe the reason +// for failure. +type UsedSpaceHandler func(container.UsedSpaceAnnouncement) error + +// UsedSpaceFilter describes the signature of the function for +// checking whether a value meets a certain criterion. +// +// Return of true means conformity, false - vice versa. +type UsedSpaceFilter func(container.UsedSpaceAnnouncement) bool + +// Iterator is a group of methods provided by entity +// which can iterate over a group of UsedSpaceAnnouncement values. +type Iterator interface { + // Iterate must start an iterator over values that + // meet the filter criterion (returns true). + // For each such value should call a handler, the error + // of which should be directly returned from the method. + // + // Internal failures of the iterator are also signaled via + // an error. After a successful call to the last value + // handler, nil should be returned. + Iterate(UsedSpaceFilter, UsedSpaceHandler) error +} + +// IteratorProvider is a group of methods provided +// by entity which generates iterators over +// UsedSpaceAnnouncement values. +type IteratorProvider interface { + // InitIterator should return an initialized Iterator. + // + // Initialization problems are reported via error. + // If no error was returned, then the Iterator must not be nil. + // + // Implementations can have different logic for different + // contexts, so specific ones may document their own behavior. + InitIterator(context.Context) (Iterator, error) +} + +// Writer describes the interface for storing UsedSpaceAnnouncement values. +// +// This interface is provided by both local storage +// of values and remote (wrappers over the RPC). +type Writer interface { + // Put performs a write operation of UsedSpaceAnnouncement value + // and returns any error encountered. + // + // All values after the Close call must be flushed to the + // physical target. Implementations can cache values before + // Close operation. + // + // Put must not be called after Close. + Put(container.UsedSpaceAnnouncement) error + + // Close exits with method-providing Writer. + // + // All cached values must be flushed before + // the Close's return. + // + // Methods must not be called after Close. + io.Closer +} + +// IteratorProvider is a group of methods provided +// by entity which generates keepers of +// UsedSpaceAnnouncement values. +type WriterProvider interface { + // InitWriter should return an initialized Writer. + // + // Initialization problems are reported via error. + // If no error was returned, then the Writer must not be nil. + // + // Implementations can have different logic for different + // contexts, so specific ones may document their own behavior. + InitWriter(context.Context) (Writer, error) +} diff --git a/pkg/services/container/announcement/load/controller/opts.go b/pkg/services/container/announcement/load/controller/opts.go new file mode 100644 index 00000000..408bfe15 --- /dev/null +++ b/pkg/services/container/announcement/load/controller/opts.go @@ -0,0 +1,28 @@ +package loadcontroller + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option sets an optional parameter of Controller. +type Option func(*options) + +type options struct { + log *logger.Logger +} + +func defaultOpts() *options { + return &options{ + log: zap.L(), + } +} + +// WithLogger returns option to specify logging component. +func WithLogger(l *logger.Logger) Option { + return func(o *options) { + if l != nil { + o.log = l + } + } +} diff --git a/pkg/services/container/announcement/load/controller/util.go b/pkg/services/container/announcement/load/controller/util.go new file mode 100644 index 00000000..df1ca703 --- /dev/null +++ b/pkg/services/container/announcement/load/controller/util.go @@ -0,0 +1,9 @@ +package loadcontroller + +import "github.com/nspcc-dev/neofs-api-go/pkg/container" + +func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter { + return func(a container.UsedSpaceAnnouncement) bool { + return a.Epoch() == epoch + } +}