diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index d54bf13d..9239df64 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -3,44 +3,22 @@ package main import ( "bytes" "context" - "crypto/ecdsa" - "crypto/sha256" - "errors" - "fmt" - "strconv" - containerV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" - netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc" containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container" - loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" - loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route" - placementrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route/placement" - loadstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/storage" containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" - containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "go.uber.org/zap" ) -const ( - startEstimationNotifyEvent = "StartEstimation" - stopEstimationNotifyEvent = "StopEstimation" -) - -func initContainerService(ctx context.Context, c *cfg) { +func initContainerService(_ context.Context, c *cfg) { // container wrapper that tries to invoke notary // requests if chain is configured so wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) @@ -52,44 +30,10 @@ func initContainerService(ctx context.Context, c *cfg) { cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc) - loadAccumulator := loadstorage.New(loadstorage.Prm{}) - - loadPlacementBuilder := &loadPlacementBuilder{ - log: c.log, - nmSrc: c.netMapSource, - cnrSrc: cnrSrc, - } - - routeBuilder := placementrouter.New(placementrouter.Prm{ - PlacementBuilder: loadPlacementBuilder, - }) - - loadRouter := loadroute.New( - loadroute.Prm{ - LocalServerInfo: c, - RemoteWriterProvider: &remoteLoadAnnounceProvider{ - key: &c.key.PrivateKey, - netmapKeys: c, - clientCache: c.bgClientCache, - deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator), - }, - Builder: routeBuilder, - }, - loadroute.WithLogger(c.log), - ) - - setLoadController(ctx, c, loadRouter, loadAccumulator) - server := containerTransportGRPC.New( containerService.NewSignService( &c.key.PrivateKey, - &usedSpaceService{ - Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), - loadWriterProvider: loadRouter, - loadPlacementBuilder: loadPlacementBuilder, - routeBuilder: routeBuilder, - cfg: c, - }, + containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), ), ) @@ -178,50 +122,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c return cnrRdr, cnrWrt } -func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) { - pubKey := c.key.PublicKey().Bytes() - - // container wrapper that always sends non-notary - // requests - wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0) - fatalOnErr(err) - - resultWriter := &morphLoadWriter{ - log: c.log, - cnrMorphClient: wrapperNoNotary, - key: pubKey, - } - - localMetrics := &localStorageLoad{ - log: c.log, - engine: c.cfgObject.cfgLocalStorage.localStorage, - } - - ctrl := loadcontroller.New( - loadcontroller.Prm{ - LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics), - AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator), - LocalAnnouncementTarget: loadRouter, - ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter), - }, - loadcontroller.WithLogger(c.log), - ) - - setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation) - addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) { - ctrl.Start(ctx, loadcontroller.StartPrm{ - Epoch: ev.(containerEvent.StartEstimation).Epoch(), - }) - }) - - setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation) - addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) { - ctrl.Stop(ctx, loadcontroller.StopPrm{ - Epoch: ev.(containerEvent.StopEstimation).Epoch(), - }) - }) -} - // addContainerNotificationHandler adds handler that will be executed synchronously. func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) { typ := event.TypeFromString(sTyp) @@ -284,219 +184,6 @@ func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationPar c.cfgContainer.parsers[typ] = p } -type morphLoadWriter struct { - log *logger.Logger - - cnrMorphClient *cntClient.Client - - key []byte -} - -func (w *morphLoadWriter) Put(a containerSDK.SizeEstimation) error { - w.log.Debug(logs.FrostFSNodeSaveUsedSpaceAnnouncementInContract, - zap.Uint64("epoch", a.Epoch()), - zap.Stringer("cid", a.Container()), - zap.Uint64("size", a.Value()), - ) - - prm := cntClient.AnnounceLoadPrm{} - - prm.SetAnnouncement(a) - prm.SetReporter(w.key) - - return w.cnrMorphClient.AnnounceLoad(prm) -} - -func (*morphLoadWriter) Close(context.Context) error { - return nil -} - -type nopLoadWriter struct{} - -func (nopLoadWriter) Put(containerSDK.SizeEstimation) error { - return nil -} - -func (nopLoadWriter) Close(context.Context) error { - return nil -} - -type remoteLoadAnnounceProvider struct { - key *ecdsa.PrivateKey - - netmapKeys netmapCore.AnnouncedKeys - - clientCache interface { - Get(client.NodeInfo) (client.MultiAddressClient, error) - } - - deadEndProvider loadcontroller.WriterProvider -} - -func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) { - if srv == nil { - return r.deadEndProvider, nil - } - - if r.netmapKeys.IsLocalKey(srv.PublicKey()) { - // if local => return no-op writer - return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil - } - - var info client.NodeInfo - - err := client.NodeInfoFromRawNetmapElement(&info, srv) - if err != nil { - return nil, fmt.Errorf("parse client node info: %w", err) - } - - c, err := r.clientCache.Get(info) - if err != nil { - return nil, fmt.Errorf("could not initialize API client: %w", err) - } - - return &remoteLoadAnnounceWriterProvider{ - client: c, - }, nil -} - -type remoteLoadAnnounceWriterProvider struct { - client client.Client -} - -func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) { - return &remoteLoadAnnounceWriter{ - client: p.client, - }, nil -} - -type remoteLoadAnnounceWriter struct { - client client.Client - - buf []containerSDK.SizeEstimation -} - -func (r *remoteLoadAnnounceWriter) Put(a containerSDK.SizeEstimation) error { - r.buf = append(r.buf, a) - - return nil -} - -func (r *remoteLoadAnnounceWriter) Close(ctx context.Context) error { - cliPrm := apiClient.PrmAnnounceSpace{ - Announcements: r.buf, - } - - _, err := r.client.ContainerAnnounceUsedSpace(ctx, cliPrm) - return err -} - -type loadPlacementBuilder struct { - log *logger.Logger - - nmSrc netmapCore.Source - - cnrSrc containerCore.Source -} - -func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) { - cnrNodes, nm, err := l.buildPlacement(epoch, cnr) - if err != nil { - return nil, err - } - - const pivotPrefix = "load_announcement_" - - pivot := []byte( - pivotPrefix + strconv.FormatUint(epoch, 10), - ) - - placement, err := nm.PlacementVectors(cnrNodes, pivot) - if err != nil { - return nil, fmt.Errorf("could not build placement vectors: %w", err) - } - - return placement, nil -} - -func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) ([][]netmap.NodeInfo, *netmap.NetMap, error) { - cnr, err := l.cnrSrc.Get(idCnr) - if err != nil { - return nil, nil, err - } - - nm, err := l.nmSrc.GetNetMapByEpoch(epoch) - if err != nil { - return nil, nil, fmt.Errorf("could not get network map: %w", err) - } - - binCnr := make([]byte, sha256.Size) - idCnr.Encode(binCnr) - - cnrNodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), binCnr) - if err != nil { - return nil, nil, fmt.Errorf("could not build container nodes: %w", err) - } - - return cnrNodes, nm, nil -} - -type localStorageLoad struct { - log *logger.Logger - - engine *engine.StorageEngine -} - -func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error { - idList, err := engine.ListContainers(context.TODO(), d.engine) - if err != nil { - return fmt.Errorf("list containers on engine failure: %w", err) - } - - for i := range idList { - sz, err := engine.ContainerSize(d.engine, idList[i]) - if err != nil { - d.log.Debug(logs.FrostFSNodeFailedToCalculateContainerSizeInStorageEngine, - zap.Stringer("cid", idList[i]), - zap.String("error", err.Error()), - ) - - continue - } - - d.log.Debug(logs.FrostFSNodeContainerSizeInStorageEngineCalculatedSuccessfully, - zap.Uint64("size", sz), - zap.Stringer("cid", idList[i]), - ) - - var a containerSDK.SizeEstimation - a.SetContainer(idList[i]) - a.SetValue(sz) - - if f != nil && !f(a) { - continue - } - - if err := h(a); err != nil { - return err - } - } - - return nil -} - -type usedSpaceService struct { - containerService.Server - - loadWriterProvider loadcontroller.WriterProvider - - loadPlacementBuilder *loadPlacementBuilder - - routeBuilder loadroute.Builder - - cfg *cfg -} - func (c *cfg) PublicKey() []byte { return nodeKeyFromNetmap(c) } @@ -517,125 +204,6 @@ func (c *cfg) ExternalAddresses() []string { return c.cfgNodeInfo.localInfo.ExternalAddresses() } -func (c *usedSpaceService) PublicKey() []byte { - return nodeKeyFromNetmap(c.cfg) -} - -func (c *usedSpaceService) IterateAddresses(f func(string) bool) { - c.cfg.iterateNetworkAddresses(f) -} - -func (c *usedSpaceService) NumberOfAddresses() int { - return c.cfg.addressNum() -} - -func (c *usedSpaceService) ExternalAddresses() []string { - return c.cfg.ExternalAddresses() -} - -func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) { - var passedRoute []loadcontroller.ServerInfo - - for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() { - passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{ - key: hdr.GetBodySignature().GetKey(), - }) - } - - for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 { - passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left] - } - - passedRoute = append(passedRoute, c) - - w, err := c.loadWriterProvider.InitWriter(passedRoute) - if err != nil { - return nil, fmt.Errorf("could not initialize container's used space writer: %w", err) - } - - var est containerSDK.SizeEstimation - - for _, aV2 := range req.GetBody().GetAnnouncements() { - err = est.ReadFromV2(aV2) - if err != nil { - return nil, fmt.Errorf("invalid size announcement: %w", err) - } - - if err := c.processLoadValue(ctx, est, passedRoute, w); err != nil { - return nil, err - } - } - - respBody := new(containerV2.AnnounceUsedSpaceResponseBody) - - resp := new(containerV2.AnnounceUsedSpaceResponse) - resp.SetBody(respBody) - - c.cfg.respSvc.SetMeta(resp) - - return resp, nil -} - -var errNodeOutsideContainer = errors.New("node outside the container") - -type containerOnlyKeyRemoteServerInfo struct { - key []byte -} - -func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte { - return i.key -} - -func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) { -} - -func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int { - return 0 -} - -func (*containerOnlyKeyRemoteServerInfo) ExternalAddresses() []string { - return nil -} - -func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, key []byte) (bool, error) { - cnrNodes, _, err := l.buildPlacement(epoch, cnr) - if err != nil { - return false, err - } - - for i := range cnrNodes { - for j := range cnrNodes[i] { - if bytes.Equal(cnrNodes[i][j].PublicKey(), key) { - return true, nil - } - } - } - - return false, nil -} - -func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation, - route []loadcontroller.ServerInfo, w loadcontroller.Writer) error { - fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey()) - if err != nil { - return fmt.Errorf("could not verify that the sender belongs to the container: %w", err) - } else if !fromCnr { - return errNodeOutsideContainer - } - - err = loadroute.CheckRoute(c.routeBuilder, a, route) - if err != nil { - return fmt.Errorf("wrong route of container's used space value: %w", err) - } - - err = w.Put(a) - if err != nil { - return fmt.Errorf("could not write container's used space value: %w", err) - } - - return nil -} - // implements interface required by container service provided by morph executor. type morphContainerReader struct { eacl containerCore.EACLSource diff --git a/pkg/services/container/announcement/load/controller/calls.go b/pkg/services/container/announcement/load/controller/calls.go deleted file mode 100644 index e1ed6e49..00000000 --- a/pkg/services/container/announcement/load/controller/calls.go +++ /dev/null @@ -1,307 +0,0 @@ -package loadcontroller - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - "go.uber.org/zap" -) - -// StartPrm groups the required parameters of the Controller.Start method. -type StartPrm struct { - // Epoch number by which you want to select - // the values of the used space of containers. - Epoch uint64 -} - -type commonContext struct { - epoch uint64 - - ctrl *Controller - - log *logger.Logger -} - -type announcer struct { - commonContext -} - -// Start starts the processing of container.SizeEstimation values. -// -// Single Start operation overtakes all data from LocalMetrics to -// LocalAnnouncementTarget (Controller's parameters). -// No filter by epoch is used for the iterator, since it is expected -// that the source of metrics does not track the change of epochs. -// -// Each call acquires an announcement context for an Epoch parameter. -// At the very end of the operation, the context is released. -func (c *Controller) Start(ctx context.Context, prm StartPrm) { - var announcer *announcer - // acquire announcement - ctx, announcer = c.acquireAnnouncement(ctx, prm) - if announcer == nil { - return - } - - // finally stop and free the announcement - defer announcer.freeAnnouncement() - - // announce local values - announcer.announce(ctx) -} - -func (c *announcer) announce(ctx context.Context) { - c.log.Debug(logs.ControllerStartingToAnnounceTheValuesOfTheMetrics) - - var ( - metricsIterator Iterator - err error - ) - - // initialize iterator over locally collected metrics - metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator() - if err != nil { - c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyCollectedMetrics, - zap.String("error", err.Error()), - ) - - return - } - - // initialize target of local announcements - targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil) - if err != nil { - c.log.Debug(logs.ControllerCouldNotInitializeAnnouncementAccumulator, - zap.String("error", err.Error()), - ) - - return - } - - // iterate over all collected metrics and write them to the target - err = metricsIterator.Iterate( - func(container.SizeEstimation) bool { - return true // local metrics don't know about epochs - }, - func(a container.SizeEstimation) error { - a.SetEpoch(c.epoch) // set epoch explicitly - return targetWriter.Put(a) - }, - ) - if err != nil { - c.log.Debug(logs.ControllerIteratorOverLocallyCollectedMetricsAborted, - zap.String("error", err.Error()), - ) - - return - } - - // finish writing - err = targetWriter.Close(ctx) - if err != nil { - c.log.Debug(logs.ControllerCouldNotFinishWritingLocalAnnouncements, - zap.String("error", err.Error()), - ) - - return - } - - c.log.Debug(logs.ControllerTrustAnnouncementSuccessfullyFinished) -} - -func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) { - started := true - c.announceMtx.Lock() - { - if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil { - ctx, cancel = context.WithCancel(ctx) - c.mAnnounceCtx[prm.Epoch] = cancel - started = false - } - } - c.announceMtx.Unlock() - - log := &logger.Logger{Logger: c.opts.log.With( - zap.Uint64("epoch", prm.Epoch), - )} - - if started { - log.Debug(logs.ControllerAnnouncementIsAlreadyStarted) - return ctx, nil - } - - return ctx, &announcer{ - commonContext: commonContext{ - epoch: prm.Epoch, - ctrl: c, - log: log, - }, - } -} - -func (c *commonContext) freeAnnouncement() { - var stopped bool - - c.ctrl.announceMtx.Lock() - - { - var cancel context.CancelFunc - - cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch] - - if stopped { - cancel() - delete(c.ctrl.mAnnounceCtx, c.epoch) - } - } - - c.ctrl.announceMtx.Unlock() - - if stopped { - c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted) - } else { - c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted) - } -} - -// StopPrm groups the required parameters of the Controller.Stop method. -type StopPrm struct { - // Epoch number the analysis of the values of which must be interrupted. - Epoch uint64 -} - -type reporter struct { - commonContext -} - -// Stop interrupts the processing of container.SizeEstimation values. -// -// Single Stop operation releases an announcement context and overtakes -// all data from AnnouncementAccumulator to ResultReceiver (Controller's -// parameters). Only values for the specified Epoch parameter are processed. -// -// Each call acquires a report context for an Epoch parameter. -// At the very end of the operation, the context is released. -func (c *Controller) Stop(ctx context.Context, prm StopPrm) { - var reporter *reporter - ctx, reporter = c.acquireReport(ctx, prm) - if reporter == nil { - return - } - - // finally stop and free reporting - defer reporter.freeReport() - - // interrupt announcement - reporter.freeAnnouncement() - - // report the estimations - reporter.report(ctx) -} - -func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) { - started := true - - c.reportMtx.Lock() - { - if cancel := c.mReportCtx[prm.Epoch]; cancel == nil { - ctx, cancel = context.WithCancel(ctx) - c.mReportCtx[prm.Epoch] = cancel - started = false - } - } - - c.reportMtx.Unlock() - - log := &logger.Logger{Logger: c.opts.log.With( - zap.Uint64("epoch", prm.Epoch), - )} - - if started { - log.Debug(logs.ControllerReportIsAlreadyStarted) - return ctx, nil - } - - return ctx, &reporter{ - commonContext: commonContext{ - epoch: prm.Epoch, - ctrl: c, - log: log, - }, - } -} - -func (c *commonContext) freeReport() { - var stopped bool - - c.ctrl.reportMtx.Lock() - - { - var cancel context.CancelFunc - - cancel, stopped = c.ctrl.mReportCtx[c.epoch] - - if stopped { - cancel() - delete(c.ctrl.mReportCtx, c.epoch) - } - } - - c.ctrl.reportMtx.Unlock() - - if stopped { - c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted) - } else { - c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted) - } -} - -func (c *reporter) report(ctx context.Context) { - var ( - localIterator Iterator - err error - ) - - // initialize iterator over locally accumulated announcements - localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator() - if err != nil { - c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyAccumulatedAnnouncements, - zap.String("error", err.Error()), - ) - - return - } - - // initialize final destination of load estimations - resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil) - if err != nil { - c.log.Debug(logs.ControllerCouldNotInitializeResultTarget, - zap.String("error", err.Error()), - ) - - return - } - - // iterate over all accumulated announcements and write them to the target - err = localIterator.Iterate( - usedSpaceFilterEpochEQ(c.epoch), - resultWriter.Put, - ) - if err != nil { - c.log.Debug(logs.ControllerIteratorOverLocalAnnouncementsAborted, - zap.String("error", err.Error()), - ) - - return - } - - // finish writing - err = resultWriter.Close(ctx) - if err != nil { - c.log.Debug(logs.ControllerCouldNotFinishWritingLoadEstimations, - zap.String("error", err.Error()), - ) - } -} diff --git a/pkg/services/container/announcement/load/controller/calls_test.go b/pkg/services/container/announcement/load/controller/calls_test.go deleted file mode 100644 index 6ca24e86..00000000 --- a/pkg/services/container/announcement/load/controller/calls_test.go +++ /dev/null @@ -1,192 +0,0 @@ -package loadcontroller_test - -import ( - "context" - "math/rand" - "sync" - "testing" - - loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" - "github.com/stretchr/testify/require" -) - -type testAnnouncementStorage struct { - w loadcontroller.Writer - - i loadcontroller.Iterator - - mtx sync.RWMutex - - m map[uint64][]container.SizeEstimation -} - -func newTestStorage() *testAnnouncementStorage { - return &testAnnouncementStorage{ - m: make(map[uint64][]container.SizeEstimation), - } -} - -func (s *testAnnouncementStorage) InitIterator() (loadcontroller.Iterator, error) { - if s.i != nil { - return s.i, nil - } - - return s, nil -} - -func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error { - s.mtx.RLock() - defer s.mtx.RUnlock() - - for _, v := range s.m { - for _, a := range v { - if f(a) { - if err := h(a); err != nil { - return err - } - } - } - } - - return nil -} - -func (s *testAnnouncementStorage) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) { - if s.w != nil { - return s.w, nil - } - - return s, nil -} - -func (s *testAnnouncementStorage) Put(v container.SizeEstimation) error { - s.mtx.Lock() - s.m[v.Epoch()] = append(s.m[v.Epoch()], v) - s.mtx.Unlock() - - return nil -} - -func (s *testAnnouncementStorage) Close(context.Context) error { - return nil -} - -func randAnnouncement() (a container.SizeEstimation) { - a.SetContainer(cidtest.ID()) - a.SetValue(rand.Uint64()) - - return -} - -func TestSimpleScenario(t *testing.T) { - // create storage to write final estimations - resultStorage := newTestStorage() - - // create storages to accumulate announcements - accumulatingStorageN2 := newTestStorage() - - // create storage of local metrics - localStorageN1 := newTestStorage() - localStorageN2 := newTestStorage() - - // create 2 controllers: 1st writes announcements to 2nd, 2nd directly to final destination - ctrlN1 := loadcontroller.New(loadcontroller.Prm{ - LocalMetrics: localStorageN1, - AnnouncementAccumulator: newTestStorage(), - LocalAnnouncementTarget: &testAnnouncementStorage{ - w: accumulatingStorageN2, - }, - ResultReceiver: resultStorage, - }) - - ctrlN2 := loadcontroller.New(loadcontroller.Prm{ - LocalMetrics: localStorageN2, - AnnouncementAccumulator: accumulatingStorageN2, - LocalAnnouncementTarget: &testAnnouncementStorage{ - w: resultStorage, - }, - ResultReceiver: resultStorage, - }) - - const processEpoch uint64 = 10 - - const goodNum = 4 - - // create 2 random values for processing epoch and 1 for some different - announces := make([]container.SizeEstimation, 0, goodNum) - - for i := 0; i < goodNum; i++ { - a := randAnnouncement() - a.SetEpoch(processEpoch) - - announces = append(announces, a) - } - - // store one half of "good" announcements to 1st metrics storage, another - to 2nd - // and "bad" to both - for i := 0; i < goodNum/2; i++ { - require.NoError(t, localStorageN1.Put(announces[i])) - } - - for i := goodNum / 2; i < goodNum; i++ { - require.NoError(t, localStorageN2.Put(announces[i])) - } - - wg := new(sync.WaitGroup) - wg.Add(2) - - startPrm := loadcontroller.StartPrm{ - Epoch: processEpoch, - } - - // start both controllers - go func() { - ctrlN1.Start(context.Background(), startPrm) - wg.Done() - }() - - go func() { - ctrlN2.Start(context.Background(), startPrm) - wg.Done() - }() - - wg.Wait() - wg.Add(2) - - stopPrm := loadcontroller.StopPrm{ - Epoch: processEpoch, - } - - // stop both controllers - go func() { - ctrlN1.Stop(context.Background(), stopPrm) - wg.Done() - }() - - go func() { - ctrlN2.Stop(context.Background(), stopPrm) - wg.Done() - }() - - wg.Wait() - - // result target should contain all "good" announcements and shoult not container the "bad" one - var res []container.SizeEstimation - - err := resultStorage.Iterate( - func(a container.SizeEstimation) bool { - return true - }, - func(a container.SizeEstimation) error { - res = append(res, a) - return nil - }, - ) - require.NoError(t, err) - - for i := range announces { - require.Contains(t, res, announces[i]) - } -} diff --git a/pkg/services/container/announcement/load/controller/controller.go b/pkg/services/container/announcement/load/controller/controller.go deleted file mode 100644 index ef6dbade..00000000 --- a/pkg/services/container/announcement/load/controller/controller.go +++ /dev/null @@ -1,94 +0,0 @@ -package loadcontroller - -import ( - "context" - "fmt" - "sync" -) - -// Prm groups the required parameters of the Controller's constructor. -// -// All values must comply with the requirements imposed on them. -// Passing incorrect parameter values will result in constructor -// failure (error or panic depending on the implementation). -type Prm struct { - // Iterator over the used space values of the containers - // collected by the node locally. - LocalMetrics IteratorProvider - - // Place of recording the local values of - // the used space of containers. - LocalAnnouncementTarget WriterProvider - - // Iterator over the summarized used space scores - // from the various network participants. - AnnouncementAccumulator IteratorProvider - - // Place of recording the final estimates of - // the used space of containers. - ResultReceiver WriterProvider -} - -// Controller represents main handler for starting -// and interrupting container volume estimation. -// -// It binds the interfaces of the local value stores -// to the target storage points. Controller is abstracted -// from the internal storage device and the network location -// of the connecting components. At its core, it is a -// high-level start-stop trigger for calculations. -// -// For correct operation, the controller must be created -// using the constructor (New) based on the required parameters -// and optional components. After successful creation, -// the constructor is immediately ready to work through -// API of external control of calculations and data transfer. -type Controller struct { - prm Prm - - opts *options - - announceMtx sync.Mutex - mAnnounceCtx map[uint64]context.CancelFunc - - reportMtx sync.Mutex - mReportCtx map[uint64]context.CancelFunc -} - -const invalidPrmValFmt = "invalid parameter %s (%T):%v" - -func panicOnPrmValue(n string, v any) { - panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) -} - -// New creates a new instance of the Controller. -// -// Panics if at least one value of the parameters is invalid. -// -// The created Controller does not require additional -// initialization and is completely ready for work. -func New(prm Prm, opts ...Option) *Controller { - switch { - case prm.LocalMetrics == nil: - panicOnPrmValue("LocalMetrics", prm.LocalMetrics) - case prm.AnnouncementAccumulator == nil: - panicOnPrmValue("AnnouncementAccumulator", prm.AnnouncementAccumulator) - case prm.LocalAnnouncementTarget == nil: - panicOnPrmValue("LocalAnnouncementTarget", prm.LocalAnnouncementTarget) - case prm.ResultReceiver == nil: - panicOnPrmValue("ResultReceiver", prm.ResultReceiver) - } - - o := defaultOpts() - - for _, opt := range opts { - opt(o) - } - - return &Controller{ - prm: prm, - opts: o, - mAnnounceCtx: make(map[uint64]context.CancelFunc), - mReportCtx: make(map[uint64]context.CancelFunc), - } -} diff --git a/pkg/services/container/announcement/load/controller/deps.go b/pkg/services/container/announcement/load/controller/deps.go deleted file mode 100644 index 99da8594..00000000 --- a/pkg/services/container/announcement/load/controller/deps.go +++ /dev/null @@ -1,103 +0,0 @@ -package loadcontroller - -import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" -) - -// UsedSpaceHandler describes the signature of the container.SizeEstimation -// value handling function. -// -// Termination of processing without failures is usually signaled -// with a zero error, while a specific value may describe the reason -// for failure. -type UsedSpaceHandler func(container.SizeEstimation) error - -// UsedSpaceFilter describes the signature of the function for -// checking whether a value meets a certain criterion. -// -// Return of true means conformity, false - vice versa. -type UsedSpaceFilter func(container.SizeEstimation) bool - -// Iterator is a group of methods provided by entity -// which can iterate over a group of container.SizeEstimation values. -type Iterator interface { - // Iterate must start an iterator over values that - // meet the filter criterion (returns true). - // For each such value should call a handler, the error - // of which should be directly returned from the method. - // - // Internal failures of the iterator are also signaled via - // an error. After a successful call to the last value - // handler, nil should be returned. - Iterate(UsedSpaceFilter, UsedSpaceHandler) error -} - -// IteratorProvider is a group of methods provided -// by entity which generates iterators over -// container.SizeEstimation values. -type IteratorProvider interface { - // InitIterator should return an initialized Iterator. - // - // Initialization problems are reported via error. - // If no error was returned, then the Iterator must not be nil. - // - // Implementations can have different logic for different - // contexts, so specific ones may document their own behavior. - InitIterator() (Iterator, error) -} - -// Writer describes the interface for storing container.SizeEstimation values. -// -// This interface is provided by both local storage -// of values and remote (wrappers over the RPC). -type Writer interface { - // Put performs a write operation of container.SizeEstimation value - // and returns any error encountered. - // - // All values after the Close call must be flushed to the - // physical target. Implementations can cache values before - // Close operation. - // - // Put must not be called after Close. - Put(container.SizeEstimation) error - - // Close exits with method-providing Writer. - // - // All cached values must be flushed before - // the Close's return. - // - // Methods must not be called after Close. - Close(ctx context.Context) error -} - -// WriterProvider is a group of methods provided -// by entity which generates keepers of -// container.SizeEstimation values. -type WriterProvider interface { - // InitWriter should return an initialized Writer. - // - // Initialization problems are reported via error. - // If no error was returned, then the Writer must not be nil. - InitWriter(route []ServerInfo) (Writer, error) -} - -// ServerInfo describes a set of -// characteristics of a point in a route. -type ServerInfo interface { - // PublicKey returns public key of the node - // from the route in a binary representation. - PublicKey() []byte - - // Iterates over network addresses of the node - // in the route. Breaks iterating on true return - // of the handler. - IterateAddresses(func(string) bool) - - // Returns number of server's network addresses. - NumberOfAddresses() int - - // ExternalAddresses returns external node's addresses. - ExternalAddresses() []string -} diff --git a/pkg/services/container/announcement/load/controller/opts.go b/pkg/services/container/announcement/load/controller/opts.go deleted file mode 100644 index 29148def..00000000 --- a/pkg/services/container/announcement/load/controller/opts.go +++ /dev/null @@ -1,28 +0,0 @@ -package loadcontroller - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "go.uber.org/zap" -) - -// Option sets an optional parameter of Controller. -type Option func(*options) - -type options struct { - log *logger.Logger -} - -func defaultOpts() *options { - return &options{ - log: &logger.Logger{Logger: zap.L()}, - } -} - -// WithLogger returns option to specify logging component. -func WithLogger(l *logger.Logger) Option { - return func(o *options) { - if l != nil { - o.log = l - } - } -} diff --git a/pkg/services/container/announcement/load/controller/util.go b/pkg/services/container/announcement/load/controller/util.go deleted file mode 100644 index 223de13b..00000000 --- a/pkg/services/container/announcement/load/controller/util.go +++ /dev/null @@ -1,36 +0,0 @@ -package loadcontroller - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" -) - -func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter { - return func(a container.SizeEstimation) bool { - return a.Epoch() == epoch - } -} - -type storageWrapper struct { - w Writer - i Iterator -} - -func (s storageWrapper) InitIterator() (Iterator, error) { - return s.i, nil -} - -func (s storageWrapper) InitWriter([]ServerInfo) (Writer, error) { - return s.w, nil -} - -func SimpleIteratorProvider(i Iterator) IteratorProvider { - return &storageWrapper{ - i: i, - } -} - -func SimpleWriterProvider(w Writer) WriterProvider { - return &storageWrapper{ - w: w, - } -} diff --git a/pkg/services/container/announcement/load/route/calls.go b/pkg/services/container/announcement/load/route/calls.go deleted file mode 100644 index 9a483aed..00000000 --- a/pkg/services/container/announcement/load/route/calls.go +++ /dev/null @@ -1,145 +0,0 @@ -package loadroute - -import ( - "context" - "encoding/hex" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - "go.uber.org/zap" -) - -// InitWriter initializes and returns Writer that sends each value to its next route point. -// -// If route is present, then it is taken into account, -// and the value will be sent to its continuation. Otherwise, the route will be laid -// from scratch and the value will be sent to its primary point. -// -// After building a list of remote points of the next leg of the route, the value is sent -// sequentially to all of them. If any transmissions (even all) fail, an error will not -// be returned. -// -// Close of the composed Writer calls Close method on each internal Writer generated in -// runtime and never returns an error. -// -// Always returns nil error. -func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) { - if len(route) == 0 { - route = []loadcontroller.ServerInfo{r.localSrvInfo} - } - - return &loadWriter{ - router: r, - route: route, - mRoute: make(map[routeKey]*valuesRoute), - mServers: make(map[string]loadcontroller.Writer), - }, nil -} - -type routeKey struct { - epoch uint64 - - cid string -} - -type valuesRoute struct { - route []loadcontroller.ServerInfo - - values []container.SizeEstimation -} - -type loadWriter struct { - router *Router - - route []loadcontroller.ServerInfo - - routeMtx sync.RWMutex - mRoute map[routeKey]*valuesRoute - - mServers map[string]loadcontroller.Writer -} - -func (w *loadWriter) Put(a container.SizeEstimation) error { - w.routeMtx.Lock() - defer w.routeMtx.Unlock() - - key := routeKey{ - epoch: a.Epoch(), - cid: a.Container().EncodeToString(), - } - - routeValues, ok := w.mRoute[key] - if !ok { - route, err := w.router.routeBuilder.NextStage(a, w.route) - if err != nil { - return err - } else if len(route) == 0 { - route = []loadcontroller.ServerInfo{nil} - } - - routeValues = &valuesRoute{ - route: route, - values: []container.SizeEstimation{a}, - } - - w.mRoute[key] = routeValues - } - - for _, remoteInfo := range routeValues.route { - var key string - - if remoteInfo != nil { - key = hex.EncodeToString(remoteInfo.PublicKey()) - } - - remoteWriter, ok := w.mServers[key] - if !ok { - provider, err := w.router.remoteProvider.InitRemote(remoteInfo) - if err != nil { - w.router.log.Debug(logs.RouteCouldNotInitializeWriterProvider, - zap.String("error", err.Error()), - ) - - continue // best effort - } - - remoteWriter, err = provider.InitWriter(w.route) - if err != nil { - w.router.log.Debug(logs.RouteCouldNotInitializeWriter, - zap.String("error", err.Error()), - ) - - continue // best effort - } - - w.mServers[key] = remoteWriter - } - - err := remoteWriter.Put(a) - if err != nil { - w.router.log.Debug(logs.RouteCouldNotPutTheValue, - zap.String("error", err.Error()), - ) - } - - // continue best effort - } - - return nil -} - -func (w *loadWriter) Close(ctx context.Context) error { - for key, wRemote := range w.mServers { - err := wRemote.Close(ctx) - if err != nil { - w.router.log.Debug(logs.RouteCouldNotCloseRemoteServerWriter, - zap.String("key", key), - zap.String("error", err.Error()), - ) - } - } - - return nil -} diff --git a/pkg/services/container/announcement/load/route/deps.go b/pkg/services/container/announcement/load/route/deps.go deleted file mode 100644 index b255900f..00000000 --- a/pkg/services/container/announcement/load/route/deps.go +++ /dev/null @@ -1,31 +0,0 @@ -package loadroute - -import ( - loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" -) - -// Builder groups methods to route values in the network. -type Builder interface { - // NextStage must return next group of route points for the value a - // based on the passed route. - // - // Empty passed list means being at the starting point of the route. - // - // Must return empty list and no error if the endpoint of the route is reached. - // If there are more than one point to go and the last passed point is included - // in that list (means that point is the last point in one of the route groups), - // returned route must contain nil point that should be interpreted as signal to, - // among sending to other route points, save the announcement in that point. - NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) -} - -// RemoteWriterProvider describes the component -// for sending values to a fixed route point. -type RemoteWriterProvider interface { - // InitRemote must return WriterProvider to the route point - // corresponding to info. - // - // Nil info matches the end of the route. - InitRemote(info loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) -} diff --git a/pkg/services/container/announcement/load/route/opts.go b/pkg/services/container/announcement/load/route/opts.go deleted file mode 100644 index ab140ab4..00000000 --- a/pkg/services/container/announcement/load/route/opts.go +++ /dev/null @@ -1,28 +0,0 @@ -package loadroute - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "go.uber.org/zap" -) - -// Option sets an optional parameter of Router. -type Option func(*options) - -type options struct { - log *logger.Logger -} - -func defaultOpts() *options { - return &options{ - log: &logger.Logger{Logger: zap.L()}, - } -} - -// WithLogger returns Option to specify logging component. -func WithLogger(l *logger.Logger) Option { - return func(o *options) { - if l != nil { - o.log = l - } - } -} diff --git a/pkg/services/container/announcement/load/route/placement/builder.go b/pkg/services/container/announcement/load/route/placement/builder.go deleted file mode 100644 index 493b8972..00000000 --- a/pkg/services/container/announcement/load/route/placement/builder.go +++ /dev/null @@ -1,49 +0,0 @@ -package placementrouter - -import "fmt" - -// Prm groups the required parameters of the Builder's constructor. -// -// All values must comply with the requirements imposed on them. -// Passing incorrect parameter values will result in constructor -// failure (error or panic depending on the implementation). -type Prm struct { - // Calculator of the container members. - // - // Must not be nil. - PlacementBuilder PlacementBuilder -} - -// Builder represents component that routes used container space -// values between nodes from the container. -// -// For correct operation, Builder must be created using -// the constructor (New) based on the required parameters -// and optional components. After successful creation, -// the Builder is immediately ready to work through API. -type Builder struct { - placementBuilder PlacementBuilder -} - -const invalidPrmValFmt = "invalid parameter %s (%T):%v" - -func panicOnPrmValue(n string, v any) { - panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) -} - -// New creates a new instance of the Builder. -// -// Panics if at least one value of the parameters is invalid. -// -// The created Builder does not require additional -// initialization and is completely ready for work. -func New(prm Prm) *Builder { - switch { - case prm.PlacementBuilder == nil: - panicOnPrmValue("PlacementBuilder", prm.PlacementBuilder) - } - - return &Builder{ - placementBuilder: prm.PlacementBuilder, - } -} diff --git a/pkg/services/container/announcement/load/route/placement/calls.go b/pkg/services/container/announcement/load/route/placement/calls.go deleted file mode 100644 index 68bdb43a..00000000 --- a/pkg/services/container/announcement/load/route/placement/calls.go +++ /dev/null @@ -1,47 +0,0 @@ -package placementrouter - -import ( - "bytes" - "fmt" - - netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" -) - -// NextStage composes container nodes for the container and epoch from a, -// and returns the list of nodes with maximum weight (one from each vector). -// -// If passed route has more than one point, then endpoint of the route is reached. -// -// The traversed route is not checked, it is assumed to be correct. -func (b *Builder) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) { - if len(passed) > 1 { - return nil, nil - } - - cnr := a.Container() - - placement, err := b.placementBuilder.BuildPlacement(a.Epoch(), cnr) - if err != nil { - return nil, fmt.Errorf("could not build placement %s: %w", cnr, err) - } - - res := make([]loadcontroller.ServerInfo, 0, len(placement)) - - for i := range placement { - if len(placement[i]) == 0 { - continue - } - - if len(passed) == 1 && bytes.Equal(passed[0].PublicKey(), placement[i][0].PublicKey()) { - // add nil element so the announcement will be saved in local memory - res = append(res, nil) - } else { - // add element with remote node to send announcement to - res = append(res, netmapcore.Node(placement[i][0])) - } - } - - return res, nil -} diff --git a/pkg/services/container/announcement/load/route/placement/deps.go b/pkg/services/container/announcement/load/route/placement/deps.go deleted file mode 100644 index 43339eb4..00000000 --- a/pkg/services/container/announcement/load/route/placement/deps.go +++ /dev/null @@ -1,14 +0,0 @@ -package placementrouter - -import ( - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" -) - -// PlacementBuilder describes interface of FrostFS placement calculator. -type PlacementBuilder interface { - // BuildPlacement must compose and sort (according to a specific algorithm) - // storage nodes from the container by its identifier using network map - // of particular epoch. - BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) -} diff --git a/pkg/services/container/announcement/load/route/router.go b/pkg/services/container/announcement/load/route/router.go deleted file mode 100644 index c8f784b1..00000000 --- a/pkg/services/container/announcement/load/route/router.go +++ /dev/null @@ -1,87 +0,0 @@ -package loadroute - -import ( - "fmt" - - loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" -) - -// Prm groups the required parameters of the Router's constructor. -// -// All values must comply with the requirements imposed on them. -// Passing incorrect parameter values will result in constructor -// failure (error or panic depending on the implementation). -type Prm struct { - // Characteristics of the local node's server. - // - // Must not be nil. - LocalServerInfo loadcontroller.ServerInfo - - // Component for sending values to a fixed route point. - // - // Must not be nil. - RemoteWriterProvider RemoteWriterProvider - - // Route planner. - // - // Must not be nil. - Builder Builder -} - -// Router represents component responsible for routing -// used container space values over the network. -// -// For each fixed pair (container ID, epoch) there is a -// single value route on the network. Router provides the -// interface for writing values to the next point of the route. -// -// For correct operation, Router must be created using -// the constructor (New) based on the required parameters -// and optional components. After successful creation, -// the Router is immediately ready to work through API. -type Router struct { - log *logger.Logger - - remoteProvider RemoteWriterProvider - - routeBuilder Builder - - localSrvInfo loadcontroller.ServerInfo -} - -const invalidPrmValFmt = "invalid parameter %s (%T):%v" - -func panicOnPrmValue(n string, v any) { - panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) -} - -// New creates a new instance of the Router. -// -// Panics if at least one value of the parameters is invalid. -// -// The created Router does not require additional -// initialization and is completely ready for work. -func New(prm Prm, opts ...Option) *Router { - switch { - case prm.RemoteWriterProvider == nil: - panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider) - case prm.Builder == nil: - panicOnPrmValue("Builder", prm.Builder) - case prm.LocalServerInfo == nil: - panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo) - } - - o := defaultOpts() - - for i := range opts { - opts[i](o) - } - - return &Router{ - log: o.log, - remoteProvider: prm.RemoteWriterProvider, - routeBuilder: prm.Builder, - localSrvInfo: prm.LocalServerInfo, - } -} diff --git a/pkg/services/container/announcement/load/route/util.go b/pkg/services/container/announcement/load/route/util.go deleted file mode 100644 index ea0f51aa..00000000 --- a/pkg/services/container/announcement/load/route/util.go +++ /dev/null @@ -1,49 +0,0 @@ -package loadroute - -import ( - "bytes" - "errors" - - loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" -) - -var errWrongRoute = errors.New("wrong route") - -// CheckRoute checks if the route is a route correctly constructed by the builder for value a. -// -// Returns nil if route is correct, otherwise an error clarifying the inconsistency. -func CheckRoute(builder Builder, a container.SizeEstimation, route []loadcontroller.ServerInfo) error { - for i := 1; i < len(route); i++ { - servers, err := builder.NextStage(a, route[:i]) - if err != nil { - return err - } else if len(servers) == 0 { - break - } - - found := false - - for j := range servers { - if servers[j] == nil { - // nil route point means that - // (i-1)-th node in the route - // must, among other things, - // save the announcement to its - // local memory - continue - } - - if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) { - found = true - break - } - } - - if !found { - return errWrongRoute - } - } - - return nil -} diff --git a/pkg/services/container/announcement/load/storage/storage.go b/pkg/services/container/announcement/load/storage/storage.go deleted file mode 100644 index 4d3104c7..00000000 --- a/pkg/services/container/announcement/load/storage/storage.go +++ /dev/null @@ -1,151 +0,0 @@ -package loadstorage - -import ( - "context" - "sort" - "sync" - - loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" -) - -type usedSpaceEstimations struct { - announcement container.SizeEstimation - - sizes []uint64 -} - -type storageKey struct { - epoch uint64 - - cid string -} - -// Storage represents in-memory storage of -// container.SizeEstimation values. -// -// The write operation has the usual behavior - to save -// the next number of used container space for a specific epoch. -// All values related to one key (epoch, container ID) are stored -// as a list. -// -// Storage also provides an iterator interface, into the handler -// of which the final score is passed, built on all values saved -// at the time of the call. Currently the only possible estimation -// formula is used - the average between 10th and 90th percentile. -// -// For correct operation, Storage must be created -// using the constructor (New) based on the required parameters -// and optional components. After successful creation, -// Storage is immediately ready to work through API. -type Storage struct { - mtx sync.RWMutex - - mItems map[storageKey]*usedSpaceEstimations -} - -// Prm groups the required parameters of the Storage's constructor. -// -// The component is not parameterizable at the moment. -type Prm struct{} - -// New creates a new instance of the Storage. -// -// The created Storage does not require additional -// initialization and is completely ready for work. -func New(_ Prm) *Storage { - return &Storage{ - mItems: make(map[storageKey]*usedSpaceEstimations), - } -} - -// Put appends the next value of the occupied container space for the epoch -// to the list of already saved values. -// -// Always returns nil error. -func (s *Storage) Put(a container.SizeEstimation) error { - s.mtx.Lock() - - { - key := storageKey{ - epoch: a.Epoch(), - cid: a.Container().EncodeToString(), - } - - estimations, ok := s.mItems[key] - if !ok { - estimations = &usedSpaceEstimations{ - announcement: a, - sizes: make([]uint64, 0, 1), - } - - s.mItems[key] = estimations - } - - estimations.sizes = append(estimations.sizes, a.Value()) - } - - s.mtx.Unlock() - - return nil -} - -func (s *Storage) Close(context.Context) error { - return nil -} - -// Iterate goes through all the lists with the key (container ID, epoch), -// calculates the final grade for all values, and passes it to the handler. -// -// Final grade is the average between 10th and 90th percentiles. -func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) (err error) { - s.mtx.RLock() - - { - for _, v := range s.mItems { - if f(v.announcement) { - // calculate estimation based on 90th percentile - v.announcement.SetValue(finalEstimation(v.sizes)) - - if err = h(v.announcement); err != nil { - break - } - } - } - } - - s.mtx.RUnlock() - - return -} - -func finalEstimation(vals []uint64) uint64 { - sort.Slice(vals, func(i, j int) bool { - return vals[i] < vals[j] - }) - - const ( - lowerRank = 10 - upperRank = 90 - ) - - if len(vals) >= lowerRank { - lowerInd := percentile(lowerRank, vals) - upperInd := percentile(upperRank, vals) - - vals = vals[lowerInd:upperInd] - } - - sum := uint64(0) - - for i := range vals { - sum += vals[i] - } - - return sum / uint64(len(vals)) -} - -func percentile(rank int, vals []uint64) int { - p := len(vals) * rank / 100 - return p -} diff --git a/pkg/services/container/announcement/load/storage/storage_test.go b/pkg/services/container/announcement/load/storage/storage_test.go deleted file mode 100644 index 20e73627..00000000 --- a/pkg/services/container/announcement/load/storage/storage_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package loadstorage - -import ( - "math/rand" - "testing" - - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" - "github.com/stretchr/testify/require" -) - -func TestStorage(t *testing.T) { - const epoch uint64 = 13 - - var a container.SizeEstimation - a.SetContainer(cidtest.ID()) - a.SetEpoch(epoch) - - const opinionsNum = 100 - - s := New(Prm{}) - - opinions := make([]uint64, opinionsNum) - for i := range opinions { - opinions[i] = rand.Uint64() - - a.SetValue(opinions[i]) - - require.NoError(t, s.Put(a)) - } - - iterCounter := 0 - - err := s.Iterate( - func(ai container.SizeEstimation) bool { - return ai.Epoch() == epoch - }, - func(ai container.SizeEstimation) error { - iterCounter++ - - require.Equal(t, epoch, ai.Epoch()) - require.Equal(t, a.Container(), ai.Container()) - require.Equal(t, finalEstimation(opinions), ai.Value()) - - return nil - }, - ) - require.NoError(t, err) - require.Equal(t, 1, iterCounter) -}