containersvc: Remove load announcement (SUPPORT) #732

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:support/v0.37 into support/v0.37 2023-10-09 19:00:31 +00:00
17 changed files with 2 additions and 1845 deletions

View file

@ -3,44 +3,22 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
"strconv"
containerV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc" containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container" containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route"
placementrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route/placement"
loadstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/storage"
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph" containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap" "go.uber.org/zap"
) )
const ( func initContainerService(_ context.Context, c *cfg) {
startEstimationNotifyEvent = "StartEstimation"
stopEstimationNotifyEvent = "StopEstimation"
)
func initContainerService(ctx context.Context, c *cfg) {
// container wrapper that tries to invoke notary // container wrapper that tries to invoke notary
// requests if chain is configured so // requests if chain is configured so
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
@ -52,44 +30,10 @@ func initContainerService(ctx context.Context, c *cfg) {
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc) cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
loadAccumulator := loadstorage.New(loadstorage.Prm{})
loadPlacementBuilder := &loadPlacementBuilder{
log: c.log,
nmSrc: c.netMapSource,
cnrSrc: cnrSrc,
}
routeBuilder := placementrouter.New(placementrouter.Prm{
PlacementBuilder: loadPlacementBuilder,
})
loadRouter := loadroute.New(
loadroute.Prm{
LocalServerInfo: c,
RemoteWriterProvider: &remoteLoadAnnounceProvider{
key: &c.key.PrivateKey,
netmapKeys: c,
clientCache: c.bgClientCache,
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
},
Builder: routeBuilder,
},
loadroute.WithLogger(c.log),
)
setLoadController(ctx, c, loadRouter, loadAccumulator)
server := containerTransportGRPC.New( server := containerTransportGRPC.New(
containerService.NewSignService( containerService.NewSignService(
&c.key.PrivateKey, &c.key.PrivateKey,
&usedSpaceService{ containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
), ),
) )
@ -178,50 +122,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
return cnrRdr, cnrWrt return cnrRdr, cnrWrt
} }
func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
pubKey := c.key.PublicKey().Bytes()
// container wrapper that always sends non-notary
// requests
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
fatalOnErr(err)
resultWriter := &morphLoadWriter{
log: c.log,
cnrMorphClient: wrapperNoNotary,
key: pubKey,
}
localMetrics := &localStorageLoad{
log: c.log,
engine: c.cfgObject.cfgLocalStorage.localStorage,
}
ctrl := loadcontroller.New(
loadcontroller.Prm{
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
LocalAnnouncementTarget: loadRouter,
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
},
loadcontroller.WithLogger(c.log),
)
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
ctrl.Start(ctx, loadcontroller.StartPrm{
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
})
})
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
ctrl.Stop(ctx, loadcontroller.StopPrm{
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
})
})
}
// addContainerNotificationHandler adds handler that will be executed synchronously. // addContainerNotificationHandler adds handler that will be executed synchronously.
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) { func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
typ := event.TypeFromString(sTyp) typ := event.TypeFromString(sTyp)
@ -284,219 +184,6 @@ func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationPar
c.cfgContainer.parsers[typ] = p c.cfgContainer.parsers[typ] = p
} }
type morphLoadWriter struct {
log *logger.Logger
cnrMorphClient *cntClient.Client
key []byte
}
func (w *morphLoadWriter) Put(a containerSDK.SizeEstimation) error {
w.log.Debug(logs.FrostFSNodeSaveUsedSpaceAnnouncementInContract,
zap.Uint64("epoch", a.Epoch()),
zap.Stringer("cid", a.Container()),
zap.Uint64("size", a.Value()),
)
prm := cntClient.AnnounceLoadPrm{}
prm.SetAnnouncement(a)
prm.SetReporter(w.key)
return w.cnrMorphClient.AnnounceLoad(prm)
}
func (*morphLoadWriter) Close(context.Context) error {
return nil
}
type nopLoadWriter struct{}
func (nopLoadWriter) Put(containerSDK.SizeEstimation) error {
return nil
}
func (nopLoadWriter) Close(context.Context) error {
return nil
}
type remoteLoadAnnounceProvider struct {
key *ecdsa.PrivateKey
netmapKeys netmapCore.AnnouncedKeys
clientCache interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
deadEndProvider loadcontroller.WriterProvider
}
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) {
if srv == nil {
return r.deadEndProvider, nil
}
if r.netmapKeys.IsLocalKey(srv.PublicKey()) {
// if local => return no-op writer
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
}
var info client.NodeInfo
err := client.NodeInfoFromRawNetmapElement(&info, srv)
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := r.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
return &remoteLoadAnnounceWriterProvider{
client: c,
}, nil
}
type remoteLoadAnnounceWriterProvider struct {
client client.Client
}
func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
return &remoteLoadAnnounceWriter{
client: p.client,
}, nil
}
type remoteLoadAnnounceWriter struct {
client client.Client
buf []containerSDK.SizeEstimation
}
func (r *remoteLoadAnnounceWriter) Put(a containerSDK.SizeEstimation) error {
r.buf = append(r.buf, a)
return nil
}
func (r *remoteLoadAnnounceWriter) Close(ctx context.Context) error {
cliPrm := apiClient.PrmAnnounceSpace{
Announcements: r.buf,
}
_, err := r.client.ContainerAnnounceUsedSpace(ctx, cliPrm)
return err
}
type loadPlacementBuilder struct {
log *logger.Logger
nmSrc netmapCore.Source
cnrSrc containerCore.Source
}
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) {
cnrNodes, nm, err := l.buildPlacement(epoch, cnr)
if err != nil {
return nil, err
}
const pivotPrefix = "load_announcement_"
pivot := []byte(
pivotPrefix + strconv.FormatUint(epoch, 10),
)
placement, err := nm.PlacementVectors(cnrNodes, pivot)
if err != nil {
return nil, fmt.Errorf("could not build placement vectors: %w", err)
}
return placement, nil
}
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) ([][]netmap.NodeInfo, *netmap.NetMap, error) {
cnr, err := l.cnrSrc.Get(idCnr)
if err != nil {
return nil, nil, err
}
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("could not get network map: %w", err)
}
binCnr := make([]byte, sha256.Size)
idCnr.Encode(binCnr)
cnrNodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
}
return cnrNodes, nm, nil
}
type localStorageLoad struct {
log *logger.Logger
engine *engine.StorageEngine
}
func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
idList, err := engine.ListContainers(context.TODO(), d.engine)
if err != nil {
return fmt.Errorf("list containers on engine failure: %w", err)
}
for i := range idList {
sz, err := engine.ContainerSize(d.engine, idList[i])
if err != nil {
d.log.Debug(logs.FrostFSNodeFailedToCalculateContainerSizeInStorageEngine,
zap.Stringer("cid", idList[i]),
zap.String("error", err.Error()),
)
continue
}
d.log.Debug(logs.FrostFSNodeContainerSizeInStorageEngineCalculatedSuccessfully,
zap.Uint64("size", sz),
zap.Stringer("cid", idList[i]),
)
var a containerSDK.SizeEstimation
a.SetContainer(idList[i])
a.SetValue(sz)
if f != nil && !f(a) {
continue
}
if err := h(a); err != nil {
return err
}
}
return nil
}
type usedSpaceService struct {
containerService.Server
loadWriterProvider loadcontroller.WriterProvider
loadPlacementBuilder *loadPlacementBuilder
routeBuilder loadroute.Builder
cfg *cfg
}
func (c *cfg) PublicKey() []byte { func (c *cfg) PublicKey() []byte {
return nodeKeyFromNetmap(c) return nodeKeyFromNetmap(c)
} }
@ -517,125 +204,6 @@ func (c *cfg) ExternalAddresses() []string {
return c.cfgNodeInfo.localInfo.ExternalAddresses() return c.cfgNodeInfo.localInfo.ExternalAddresses()
} }
func (c *usedSpaceService) PublicKey() []byte {
return nodeKeyFromNetmap(c.cfg)
}
func (c *usedSpaceService) IterateAddresses(f func(string) bool) {
c.cfg.iterateNetworkAddresses(f)
}
func (c *usedSpaceService) NumberOfAddresses() int {
return c.cfg.addressNum()
}
func (c *usedSpaceService) ExternalAddresses() []string {
return c.cfg.ExternalAddresses()
}
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
var passedRoute []loadcontroller.ServerInfo
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
key: hdr.GetBodySignature().GetKey(),
})
}
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
}
passedRoute = append(passedRoute, c)
w, err := c.loadWriterProvider.InitWriter(passedRoute)
if err != nil {
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
}
var est containerSDK.SizeEstimation
for _, aV2 := range req.GetBody().GetAnnouncements() {
err = est.ReadFromV2(aV2)
if err != nil {
return nil, fmt.Errorf("invalid size announcement: %w", err)
}
if err := c.processLoadValue(ctx, est, passedRoute, w); err != nil {
return nil, err
}
}
respBody := new(containerV2.AnnounceUsedSpaceResponseBody)
resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody)
c.cfg.respSvc.SetMeta(resp)
return resp, nil
}
var errNodeOutsideContainer = errors.New("node outside the container")
type containerOnlyKeyRemoteServerInfo struct {
key []byte
}
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
return i.key
}
func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
}
func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
return 0
}
func (*containerOnlyKeyRemoteServerInfo) ExternalAddresses() []string {
return nil
}
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, key []byte) (bool, error) {
cnrNodes, _, err := l.buildPlacement(epoch, cnr)
if err != nil {
return false, err
}
for i := range cnrNodes {
for j := range cnrNodes[i] {
if bytes.Equal(cnrNodes[i][j].PublicKey(), key) {
return true, nil
}
}
}
return false, nil
}
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
route []loadcontroller.ServerInfo, w loadcontroller.Writer) error {
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
if err != nil {
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
} else if !fromCnr {
return errNodeOutsideContainer
}
err = loadroute.CheckRoute(c.routeBuilder, a, route)
if err != nil {
return fmt.Errorf("wrong route of container's used space value: %w", err)
}
err = w.Put(a)
if err != nil {
return fmt.Errorf("could not write container's used space value: %w", err)
}
return nil
}
// implements interface required by container service provided by morph executor. // implements interface required by container service provided by morph executor.
type morphContainerReader struct { type morphContainerReader struct {
eacl containerCore.EACLSource eacl containerCore.EACLSource

View file

@ -1,307 +0,0 @@
package loadcontroller
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"go.uber.org/zap"
)
// StartPrm groups the required parameters of the Controller.Start method.
type StartPrm struct {
// Epoch number by which you want to select
// the values of the used space of containers.
Epoch uint64
}
type commonContext struct {
epoch uint64
ctrl *Controller
log *logger.Logger
}
type announcer struct {
commonContext
}
// Start starts the processing of container.SizeEstimation values.
//
// Single Start operation overtakes all data from LocalMetrics to
// LocalAnnouncementTarget (Controller's parameters).
// No filter by epoch is used for the iterator, since it is expected
// that the source of metrics does not track the change of epochs.
//
// Each call acquires an announcement context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Start(ctx context.Context, prm StartPrm) {
var announcer *announcer
// acquire announcement
ctx, announcer = c.acquireAnnouncement(ctx, prm)
if announcer == nil {
return
}
// finally stop and free the announcement
defer announcer.freeAnnouncement()
// announce local values
announcer.announce(ctx)
}
func (c *announcer) announce(ctx context.Context) {
c.log.Debug(logs.ControllerStartingToAnnounceTheValuesOfTheMetrics)
var (
metricsIterator Iterator
err error
)
// initialize iterator over locally collected metrics
metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator()
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyCollectedMetrics,
zap.String("error", err.Error()),
)
return
}
// initialize target of local announcements
targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeAnnouncementAccumulator,
zap.String("error", err.Error()),
)
return
}
// iterate over all collected metrics and write them to the target
err = metricsIterator.Iterate(
func(container.SizeEstimation) bool {
return true // local metrics don't know about epochs
},
func(a container.SizeEstimation) error {
a.SetEpoch(c.epoch) // set epoch explicitly
return targetWriter.Put(a)
},
)
if err != nil {
c.log.Debug(logs.ControllerIteratorOverLocallyCollectedMetricsAborted,
zap.String("error", err.Error()),
)
return
}
// finish writing
err = targetWriter.Close(ctx)
if err != nil {
c.log.Debug(logs.ControllerCouldNotFinishWritingLocalAnnouncements,
zap.String("error", err.Error()),
)
return
}
c.log.Debug(logs.ControllerTrustAnnouncementSuccessfullyFinished)
}
func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) {
started := true
c.announceMtx.Lock()
{
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(ctx)
c.mAnnounceCtx[prm.Epoch] = cancel
started = false
}
}
c.announceMtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)}
if started {
log.Debug(logs.ControllerAnnouncementIsAlreadyStarted)
return ctx, nil
}
return ctx, &announcer{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
},
}
}
func (c *commonContext) freeAnnouncement() {
var stopped bool
c.ctrl.announceMtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch]
if stopped {
cancel()
delete(c.ctrl.mAnnounceCtx, c.epoch)
}
}
c.ctrl.announceMtx.Unlock()
if stopped {
c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted)
} else {
c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted)
}
}
// StopPrm groups the required parameters of the Controller.Stop method.
type StopPrm struct {
// Epoch number the analysis of the values of which must be interrupted.
Epoch uint64
}
type reporter struct {
commonContext
}
// Stop interrupts the processing of container.SizeEstimation values.
//
// Single Stop operation releases an announcement context and overtakes
// all data from AnnouncementAccumulator to ResultReceiver (Controller's
// parameters). Only values for the specified Epoch parameter are processed.
//
// Each call acquires a report context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Stop(ctx context.Context, prm StopPrm) {
var reporter *reporter
ctx, reporter = c.acquireReport(ctx, prm)
if reporter == nil {
return
}
// finally stop and free reporting
defer reporter.freeReport()
// interrupt announcement
reporter.freeAnnouncement()
// report the estimations
reporter.report(ctx)
}
func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) {
started := true
c.reportMtx.Lock()
{
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(ctx)
c.mReportCtx[prm.Epoch] = cancel
started = false
}
}
c.reportMtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)}
if started {
log.Debug(logs.ControllerReportIsAlreadyStarted)
return ctx, nil
}
return ctx, &reporter{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
},
}
}
func (c *commonContext) freeReport() {
var stopped bool
c.ctrl.reportMtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.ctrl.mReportCtx[c.epoch]
if stopped {
cancel()
delete(c.ctrl.mReportCtx, c.epoch)
}
}
c.ctrl.reportMtx.Unlock()
if stopped {
c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted)
} else {
c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted)
}
}
func (c *reporter) report(ctx context.Context) {
var (
localIterator Iterator
err error
)
// initialize iterator over locally accumulated announcements
localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator()
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyAccumulatedAnnouncements,
zap.String("error", err.Error()),
)
return
}
// initialize final destination of load estimations
resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeResultTarget,
zap.String("error", err.Error()),
)
return
}
// iterate over all accumulated announcements and write them to the target
err = localIterator.Iterate(
usedSpaceFilterEpochEQ(c.epoch),
resultWriter.Put,
)
if err != nil {
c.log.Debug(logs.ControllerIteratorOverLocalAnnouncementsAborted,
zap.String("error", err.Error()),
)
return
}
// finish writing
err = resultWriter.Close(ctx)
if err != nil {
c.log.Debug(logs.ControllerCouldNotFinishWritingLoadEstimations,
zap.String("error", err.Error()),
)
}
}

View file

@ -1,192 +0,0 @@
package loadcontroller_test
import (
"context"
"math/rand"
"sync"
"testing"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
type testAnnouncementStorage struct {
w loadcontroller.Writer
i loadcontroller.Iterator
mtx sync.RWMutex
m map[uint64][]container.SizeEstimation
}
func newTestStorage() *testAnnouncementStorage {
return &testAnnouncementStorage{
m: make(map[uint64][]container.SizeEstimation),
}
}
func (s *testAnnouncementStorage) InitIterator() (loadcontroller.Iterator, error) {
if s.i != nil {
return s.i, nil
}
return s, nil
}
func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, v := range s.m {
for _, a := range v {
if f(a) {
if err := h(a); err != nil {
return err
}
}
}
}
return nil
}
func (s *testAnnouncementStorage) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
if s.w != nil {
return s.w, nil
}
return s, nil
}
func (s *testAnnouncementStorage) Put(v container.SizeEstimation) error {
s.mtx.Lock()
s.m[v.Epoch()] = append(s.m[v.Epoch()], v)
s.mtx.Unlock()
return nil
}
func (s *testAnnouncementStorage) Close(context.Context) error {
return nil
}
func randAnnouncement() (a container.SizeEstimation) {
a.SetContainer(cidtest.ID())
a.SetValue(rand.Uint64())
return
}
func TestSimpleScenario(t *testing.T) {
// create storage to write final estimations
resultStorage := newTestStorage()
// create storages to accumulate announcements
accumulatingStorageN2 := newTestStorage()
// create storage of local metrics
localStorageN1 := newTestStorage()
localStorageN2 := newTestStorage()
// create 2 controllers: 1st writes announcements to 2nd, 2nd directly to final destination
ctrlN1 := loadcontroller.New(loadcontroller.Prm{
LocalMetrics: localStorageN1,
AnnouncementAccumulator: newTestStorage(),
LocalAnnouncementTarget: &testAnnouncementStorage{
w: accumulatingStorageN2,
},
ResultReceiver: resultStorage,
})
ctrlN2 := loadcontroller.New(loadcontroller.Prm{
LocalMetrics: localStorageN2,
AnnouncementAccumulator: accumulatingStorageN2,
LocalAnnouncementTarget: &testAnnouncementStorage{
w: resultStorage,
},
ResultReceiver: resultStorage,
})
const processEpoch uint64 = 10
const goodNum = 4
// create 2 random values for processing epoch and 1 for some different
announces := make([]container.SizeEstimation, 0, goodNum)
for i := 0; i < goodNum; i++ {
a := randAnnouncement()
a.SetEpoch(processEpoch)
announces = append(announces, a)
}
// store one half of "good" announcements to 1st metrics storage, another - to 2nd
// and "bad" to both
for i := 0; i < goodNum/2; i++ {
require.NoError(t, localStorageN1.Put(announces[i]))
}
for i := goodNum / 2; i < goodNum; i++ {
require.NoError(t, localStorageN2.Put(announces[i]))
}
wg := new(sync.WaitGroup)
wg.Add(2)
startPrm := loadcontroller.StartPrm{
Epoch: processEpoch,
}
// start both controllers
go func() {
ctrlN1.Start(context.Background(), startPrm)
wg.Done()
}()
go func() {
ctrlN2.Start(context.Background(), startPrm)
wg.Done()
}()
wg.Wait()
wg.Add(2)
stopPrm := loadcontroller.StopPrm{
Epoch: processEpoch,
}
// stop both controllers
go func() {
ctrlN1.Stop(context.Background(), stopPrm)
wg.Done()
}()
go func() {
ctrlN2.Stop(context.Background(), stopPrm)
wg.Done()
}()
wg.Wait()
// result target should contain all "good" announcements and shoult not container the "bad" one
var res []container.SizeEstimation
err := resultStorage.Iterate(
func(a container.SizeEstimation) bool {
return true
},
func(a container.SizeEstimation) error {
res = append(res, a)
return nil
},
)
require.NoError(t, err)
for i := range announces {
require.Contains(t, res, announces[i])
}
}

View file

@ -1,94 +0,0 @@
package loadcontroller
import (
"context"
"fmt"
"sync"
)
// Prm groups the required parameters of the Controller's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Iterator over the used space values of the containers
// collected by the node locally.
LocalMetrics IteratorProvider
// Place of recording the local values of
// the used space of containers.
LocalAnnouncementTarget WriterProvider
// Iterator over the summarized used space scores
// from the various network participants.
AnnouncementAccumulator IteratorProvider
// Place of recording the final estimates of
// the used space of containers.
ResultReceiver WriterProvider
}
// Controller represents main handler for starting
// and interrupting container volume estimation.
//
// It binds the interfaces of the local value stores
// to the target storage points. Controller is abstracted
// from the internal storage device and the network location
// of the connecting components. At its core, it is a
// high-level start-stop trigger for calculations.
//
// For correct operation, the controller must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the constructor is immediately ready to work through
// API of external control of calculations and data transfer.
type Controller struct {
prm Prm
opts *options
announceMtx sync.Mutex
mAnnounceCtx map[uint64]context.CancelFunc
reportMtx sync.Mutex
mReportCtx map[uint64]context.CancelFunc
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Controller.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Controller does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Controller {
switch {
case prm.LocalMetrics == nil:
panicOnPrmValue("LocalMetrics", prm.LocalMetrics)
case prm.AnnouncementAccumulator == nil:
panicOnPrmValue("AnnouncementAccumulator", prm.AnnouncementAccumulator)
case prm.LocalAnnouncementTarget == nil:
panicOnPrmValue("LocalAnnouncementTarget", prm.LocalAnnouncementTarget)
case prm.ResultReceiver == nil:
panicOnPrmValue("ResultReceiver", prm.ResultReceiver)
}
o := defaultOpts()
for _, opt := range opts {
opt(o)
}
return &Controller{
prm: prm,
opts: o,
mAnnounceCtx: make(map[uint64]context.CancelFunc),
mReportCtx: make(map[uint64]context.CancelFunc),
}
}

View file

@ -1,103 +0,0 @@
package loadcontroller
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// UsedSpaceHandler describes the signature of the container.SizeEstimation
// value handling function.
//
// Termination of processing without failures is usually signaled
// with a zero error, while a specific value may describe the reason
// for failure.
type UsedSpaceHandler func(container.SizeEstimation) error
// UsedSpaceFilter describes the signature of the function for
// checking whether a value meets a certain criterion.
//
// Return of true means conformity, false - vice versa.
type UsedSpaceFilter func(container.SizeEstimation) bool
// Iterator is a group of methods provided by entity
// which can iterate over a group of container.SizeEstimation values.
type Iterator interface {
// Iterate must start an iterator over values that
// meet the filter criterion (returns true).
// For each such value should call a handler, the error
// of which should be directly returned from the method.
//
// Internal failures of the iterator are also signaled via
// an error. After a successful call to the last value
// handler, nil should be returned.
Iterate(UsedSpaceFilter, UsedSpaceHandler) error
}
// IteratorProvider is a group of methods provided
// by entity which generates iterators over
// container.SizeEstimation values.
type IteratorProvider interface {
// InitIterator should return an initialized Iterator.
//
// Initialization problems are reported via error.
// If no error was returned, then the Iterator must not be nil.
//
// Implementations can have different logic for different
// contexts, so specific ones may document their own behavior.
InitIterator() (Iterator, error)
}
// Writer describes the interface for storing container.SizeEstimation values.
//
// This interface is provided by both local storage
// of values and remote (wrappers over the RPC).
type Writer interface {
// Put performs a write operation of container.SizeEstimation value
// and returns any error encountered.
//
// All values after the Close call must be flushed to the
// physical target. Implementations can cache values before
// Close operation.
//
// Put must not be called after Close.
Put(container.SizeEstimation) error
// Close exits with method-providing Writer.
//
// All cached values must be flushed before
// the Close's return.
//
// Methods must not be called after Close.
Close(ctx context.Context) error
}
// WriterProvider is a group of methods provided
// by entity which generates keepers of
// container.SizeEstimation values.
type WriterProvider interface {
// InitWriter should return an initialized Writer.
//
// Initialization problems are reported via error.
// If no error was returned, then the Writer must not be nil.
InitWriter(route []ServerInfo) (Writer, error)
}
// ServerInfo describes a set of
// characteristics of a point in a route.
type ServerInfo interface {
// PublicKey returns public key of the node
// from the route in a binary representation.
PublicKey() []byte
// Iterates over network addresses of the node
// in the route. Breaks iterating on true return
// of the handler.
IterateAddresses(func(string) bool)
// Returns number of server's network addresses.
NumberOfAddresses() int
// ExternalAddresses returns external node's addresses.
ExternalAddresses() []string
}

View file

@ -1,28 +0,0 @@
package loadcontroller
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Controller.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns option to specify logging component.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,36 +0,0 @@
package loadcontroller
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter {
return func(a container.SizeEstimation) bool {
return a.Epoch() == epoch
}
}
type storageWrapper struct {
w Writer
i Iterator
}
func (s storageWrapper) InitIterator() (Iterator, error) {
return s.i, nil
}
func (s storageWrapper) InitWriter([]ServerInfo) (Writer, error) {
return s.w, nil
}
func SimpleIteratorProvider(i Iterator) IteratorProvider {
return &storageWrapper{
i: i,
}
}
func SimpleWriterProvider(w Writer) WriterProvider {
return &storageWrapper{
w: w,
}
}

View file

@ -1,145 +0,0 @@
package loadroute
import (
"context"
"encoding/hex"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"go.uber.org/zap"
)
// InitWriter initializes and returns Writer that sends each value to its next route point.
//
// If route is present, then it is taken into account,
// and the value will be sent to its continuation. Otherwise, the route will be laid
// from scratch and the value will be sent to its primary point.
//
// After building a list of remote points of the next leg of the route, the value is sent
// sequentially to all of them. If any transmissions (even all) fail, an error will not
// be returned.
//
// Close of the composed Writer calls Close method on each internal Writer generated in
// runtime and never returns an error.
//
// Always returns nil error.
func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
if len(route) == 0 {
route = []loadcontroller.ServerInfo{r.localSrvInfo}
}
return &loadWriter{
router: r,
route: route,
mRoute: make(map[routeKey]*valuesRoute),
mServers: make(map[string]loadcontroller.Writer),
}, nil
}
type routeKey struct {
epoch uint64
cid string
}
type valuesRoute struct {
route []loadcontroller.ServerInfo
values []container.SizeEstimation
}
type loadWriter struct {
router *Router
route []loadcontroller.ServerInfo
routeMtx sync.RWMutex
mRoute map[routeKey]*valuesRoute
mServers map[string]loadcontroller.Writer
}
func (w *loadWriter) Put(a container.SizeEstimation) error {
w.routeMtx.Lock()
defer w.routeMtx.Unlock()
key := routeKey{
epoch: a.Epoch(),
cid: a.Container().EncodeToString(),
}
routeValues, ok := w.mRoute[key]
if !ok {
route, err := w.router.routeBuilder.NextStage(a, w.route)
if err != nil {
return err
} else if len(route) == 0 {
route = []loadcontroller.ServerInfo{nil}
}
routeValues = &valuesRoute{
route: route,
values: []container.SizeEstimation{a},
}
w.mRoute[key] = routeValues
}
for _, remoteInfo := range routeValues.route {
var key string
if remoteInfo != nil {
key = hex.EncodeToString(remoteInfo.PublicKey())
}
remoteWriter, ok := w.mServers[key]
if !ok {
provider, err := w.router.remoteProvider.InitRemote(remoteInfo)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotInitializeWriterProvider,
zap.String("error", err.Error()),
)
continue // best effort
}
remoteWriter, err = provider.InitWriter(w.route)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotInitializeWriter,
zap.String("error", err.Error()),
)
continue // best effort
}
w.mServers[key] = remoteWriter
}
err := remoteWriter.Put(a)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotPutTheValue,
zap.String("error", err.Error()),
)
}
// continue best effort
}
return nil
}
func (w *loadWriter) Close(ctx context.Context) error {
for key, wRemote := range w.mServers {
err := wRemote.Close(ctx)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotCloseRemoteServerWriter,
zap.String("key", key),
zap.String("error", err.Error()),
)
}
}
return nil
}

View file

@ -1,31 +0,0 @@
package loadroute
import (
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// Builder groups methods to route values in the network.
type Builder interface {
// NextStage must return next group of route points for the value a
// based on the passed route.
//
// Empty passed list means being at the starting point of the route.
//
// Must return empty list and no error if the endpoint of the route is reached.
// If there are more than one point to go and the last passed point is included
// in that list (means that point is the last point in one of the route groups),
// returned route must contain nil point that should be interpreted as signal to,
// among sending to other route points, save the announcement in that point.
NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error)
}
// RemoteWriterProvider describes the component
// for sending values to a fixed route point.
type RemoteWriterProvider interface {
// InitRemote must return WriterProvider to the route point
// corresponding to info.
//
// Nil info matches the end of the route.
InitRemote(info loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error)
}

View file

@ -1,28 +0,0 @@
package loadroute
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Router.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns Option to specify logging component.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,49 +0,0 @@
package placementrouter
import "fmt"
// Prm groups the required parameters of the Builder's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Calculator of the container members.
//
// Must not be nil.
PlacementBuilder PlacementBuilder
}
// Builder represents component that routes used container space
// values between nodes from the container.
//
// For correct operation, Builder must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Builder is immediately ready to work through API.
type Builder struct {
placementBuilder PlacementBuilder
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Builder.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Builder does not require additional
// initialization and is completely ready for work.
func New(prm Prm) *Builder {
switch {
case prm.PlacementBuilder == nil:
panicOnPrmValue("PlacementBuilder", prm.PlacementBuilder)
}
return &Builder{
placementBuilder: prm.PlacementBuilder,
}
}

View file

@ -1,47 +0,0 @@
package placementrouter
import (
"bytes"
"fmt"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// NextStage composes container nodes for the container and epoch from a,
// and returns the list of nodes with maximum weight (one from each vector).
//
// If passed route has more than one point, then endpoint of the route is reached.
//
// The traversed route is not checked, it is assumed to be correct.
func (b *Builder) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) {
if len(passed) > 1 {
return nil, nil
}
cnr := a.Container()
placement, err := b.placementBuilder.BuildPlacement(a.Epoch(), cnr)
if err != nil {
return nil, fmt.Errorf("could not build placement %s: %w", cnr, err)
}
res := make([]loadcontroller.ServerInfo, 0, len(placement))
for i := range placement {
if len(placement[i]) == 0 {
continue
}
if len(passed) == 1 && bytes.Equal(passed[0].PublicKey(), placement[i][0].PublicKey()) {
// add nil element so the announcement will be saved in local memory
res = append(res, nil)
} else {
// add element with remote node to send announcement to
res = append(res, netmapcore.Node(placement[i][0]))
}
}
return res, nil
}

View file

@ -1,14 +0,0 @@
package placementrouter
import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
// PlacementBuilder describes interface of FrostFS placement calculator.
type PlacementBuilder interface {
// BuildPlacement must compose and sort (according to a specific algorithm)
// storage nodes from the container by its identifier using network map
// of particular epoch.
BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error)
}

View file

@ -1,87 +0,0 @@
package loadroute
import (
"fmt"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
// Prm groups the required parameters of the Router's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Characteristics of the local node's server.
//
// Must not be nil.
LocalServerInfo loadcontroller.ServerInfo
// Component for sending values to a fixed route point.
//
// Must not be nil.
RemoteWriterProvider RemoteWriterProvider
// Route planner.
//
// Must not be nil.
Builder Builder
}
// Router represents component responsible for routing
// used container space values over the network.
//
// For each fixed pair (container ID, epoch) there is a
// single value route on the network. Router provides the
// interface for writing values to the next point of the route.
//
// For correct operation, Router must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Router is immediately ready to work through API.
type Router struct {
log *logger.Logger
remoteProvider RemoteWriterProvider
routeBuilder Builder
localSrvInfo loadcontroller.ServerInfo
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Router.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Router does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Router {
switch {
case prm.RemoteWriterProvider == nil:
panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider)
case prm.Builder == nil:
panicOnPrmValue("Builder", prm.Builder)
case prm.LocalServerInfo == nil:
panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo)
}
o := defaultOpts()
for i := range opts {
opts[i](o)
}
return &Router{
log: o.log,
remoteProvider: prm.RemoteWriterProvider,
routeBuilder: prm.Builder,
localSrvInfo: prm.LocalServerInfo,
}
}

View file

@ -1,49 +0,0 @@
package loadroute
import (
"bytes"
"errors"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
var errWrongRoute = errors.New("wrong route")
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
//
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
func CheckRoute(builder Builder, a container.SizeEstimation, route []loadcontroller.ServerInfo) error {
for i := 1; i < len(route); i++ {
servers, err := builder.NextStage(a, route[:i])
if err != nil {
return err
} else if len(servers) == 0 {
break
}
found := false
for j := range servers {
if servers[j] == nil {
// nil route point means that
// (i-1)-th node in the route
// must, among other things,
// save the announcement to its
// local memory
continue
}
if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) {
found = true
break
}
}
if !found {
return errWrongRoute
}
}
return nil
}

View file

@ -1,151 +0,0 @@
package loadstorage
import (
"context"
"sort"
"sync"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
type usedSpaceEstimations struct {
announcement container.SizeEstimation
sizes []uint64
}
type storageKey struct {
epoch uint64
cid string
}
// Storage represents in-memory storage of
// container.SizeEstimation values.
//
// The write operation has the usual behavior - to save
// the next number of used container space for a specific epoch.
// All values related to one key (epoch, container ID) are stored
// as a list.
//
// Storage also provides an iterator interface, into the handler
// of which the final score is passed, built on all values saved
// at the time of the call. Currently the only possible estimation
// formula is used - the average between 10th and 90th percentile.
//
// For correct operation, Storage must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// Storage is immediately ready to work through API.
type Storage struct {
mtx sync.RWMutex
mItems map[storageKey]*usedSpaceEstimations
}
// Prm groups the required parameters of the Storage's constructor.
//
// The component is not parameterizable at the moment.
type Prm struct{}
// New creates a new instance of the Storage.
//
// The created Storage does not require additional
// initialization and is completely ready for work.
func New(_ Prm) *Storage {
return &Storage{
mItems: make(map[storageKey]*usedSpaceEstimations),
}
}
// Put appends the next value of the occupied container space for the epoch
// to the list of already saved values.
//
// Always returns nil error.
func (s *Storage) Put(a container.SizeEstimation) error {
s.mtx.Lock()
{
key := storageKey{
epoch: a.Epoch(),
cid: a.Container().EncodeToString(),
}
estimations, ok := s.mItems[key]
if !ok {
estimations = &usedSpaceEstimations{
announcement: a,
sizes: make([]uint64, 0, 1),
}
s.mItems[key] = estimations
}
estimations.sizes = append(estimations.sizes, a.Value())
}
s.mtx.Unlock()
return nil
}
func (s *Storage) Close(context.Context) error {
return nil
}
// Iterate goes through all the lists with the key (container ID, epoch),
// calculates the final grade for all values, and passes it to the handler.
//
// Final grade is the average between 10th and 90th percentiles.
func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) (err error) {
s.mtx.RLock()
{
for _, v := range s.mItems {
if f(v.announcement) {
// calculate estimation based on 90th percentile
v.announcement.SetValue(finalEstimation(v.sizes))
if err = h(v.announcement); err != nil {
break
}
}
}
}
s.mtx.RUnlock()
return
}
func finalEstimation(vals []uint64) uint64 {
sort.Slice(vals, func(i, j int) bool {
return vals[i] < vals[j]
})
const (
lowerRank = 10
upperRank = 90
)
if len(vals) >= lowerRank {
lowerInd := percentile(lowerRank, vals)
upperInd := percentile(upperRank, vals)
vals = vals[lowerInd:upperInd]
}
sum := uint64(0)
for i := range vals {
sum += vals[i]
}
return sum / uint64(len(vals))
}
func percentile(rank int, vals []uint64) int {
p := len(vals) * rank / 100
return p
}

View file

@ -1,50 +0,0 @@
package loadstorage
import (
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
func TestStorage(t *testing.T) {
const epoch uint64 = 13
var a container.SizeEstimation
a.SetContainer(cidtest.ID())
a.SetEpoch(epoch)
const opinionsNum = 100
s := New(Prm{})
opinions := make([]uint64, opinionsNum)
for i := range opinions {
opinions[i] = rand.Uint64()
a.SetValue(opinions[i])
require.NoError(t, s.Put(a))
}
iterCounter := 0
err := s.Iterate(
func(ai container.SizeEstimation) bool {
return ai.Epoch() == epoch
},
func(ai container.SizeEstimation) error {
iterCounter++
require.Equal(t, epoch, ai.Epoch())
require.Equal(t, a.Container(), ai.Container())
require.Equal(t, finalEstimation(opinions), ai.Value())
return nil
},
)
require.NoError(t, err)
require.Equal(t, 1, iterCounter)
}