Compare commits

...

8 commits

Author SHA1 Message Date
dc81b4b50c [#725] writecache: Fix metric values
All checks were successful
DCO action / DCO (pull_request) Successful in 1m42s
Build / Build Components (1.20) (pull_request) Successful in 4m21s
Tests and linters / Staticcheck (pull_request) Successful in 4m32s
Tests and linters / Tests (1.21) (pull_request) Successful in 4m46s
Tests and linters / Lint (pull_request) Successful in 5m21s
Tests and linters / Tests (1.20) (pull_request) Successful in 6m14s
Build / Build Components (1.21) (pull_request) Successful in 15m46s
Vulncheck / Vulncheck (pull_request) Successful in 1m21s
Tests and linters / Tests with -race (pull_request) Successful in 4m6s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-27 16:25:04 +03:00
98da032324 [#758] ir: Do not exclude node in maintenance mode from netmap
All checks were successful
DCO action / DCO (pull_request) Successful in 2m59s
Build / Build Components (1.21) (pull_request) Successful in 4m18s
Build / Build Components (1.20) (pull_request) Successful in 4m45s
Vulncheck / Vulncheck (pull_request) Successful in 5m12s
Tests and linters / Tests (1.20) (pull_request) Successful in 9m11s
Tests and linters / Tests (1.21) (pull_request) Successful in 9m10s
Tests and linters / Staticcheck (pull_request) Successful in 9m6s
Tests and linters / Tests with -race (pull_request) Successful in 9m17s
Tests and linters / Lint (pull_request) Successful in 9m41s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-26 16:05:02 +03:00
5acc13fa94 [#732] containersvc: Remove load announcement
Some checks failed
DCO action / DCO (pull_request) Successful in 3m16s
Vulncheck / Vulncheck (pull_request) Successful in 4m7s
Build / Build Components (1.21) (pull_request) Successful in 4m42s
Build / Build Components (1.20) (pull_request) Successful in 6m40s
Tests and linters / Tests with -race (pull_request) Failing after 7m14s
Tests and linters / Tests (1.20) (pull_request) Successful in 8m25s
Tests and linters / Staticcheck (pull_request) Successful in 8m11s
Tests and linters / Tests (1.21) (pull_request) Successful in 8m31s
Tests and linters / Lint (pull_request) Successful in 8m50s
IR code was removed in 8879c6ea.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-09 15:17:10 +03:00
e0f0b93b5e [#723] netmap: Drop already bootstraped check
All checks were successful
DCO action / DCO (pull_request) Successful in 3m48s
Build / Build Components (1.21) (pull_request) Successful in 5m10s
Build / Build Components (1.20) (pull_request) Successful in 5m47s
Vulncheck / Vulncheck (pull_request) Successful in 6m31s
Tests and linters / Tests (1.21) (pull_request) Successful in 7m2s
Tests and linters / Staticcheck (pull_request) Successful in 10m48s
Tests and linters / Lint (pull_request) Successful in 11m16s
Tests and linters / Tests (1.20) (pull_request) Successful in 11m31s
Tests and linters / Tests with -race (pull_request) Successful in 11m21s
Because of this check, under certain conditions,
the node could be removed from the network map,
although the node was functioning normally.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-05 15:31:18 +03:00
eb5248621a [#723] netmap: Send bootstrap at each epoch tick
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-10-05 15:31:10 +03:00
02be6a4341 [#702] node: Update SDK version
Some checks failed
Tests and linters / Lint (pull_request) Failing after 53s
DCO action / DCO (pull_request) Successful in 3m58s
Vulncheck / Vulncheck (pull_request) Successful in 4m44s
Build / Build Components (1.21) (pull_request) Successful in 6m9s
Build / Build Components (1.20) (pull_request) Successful in 6m15s
Tests and linters / Tests (1.21) (pull_request) Successful in 6m59s
Tests and linters / Staticcheck (pull_request) Successful in 6m54s
Tests and linters / Tests (1.20) (pull_request) Successful in 7m15s
Tests and linters / Tests with -race (pull_request) Successful in 7m9s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-09-29 18:46:06 +03:00
368774be95 [#691] node: Compare node info during initial bootstrap properly
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-18 07:30:15 +00:00
b9ef294b99 [#692] go.mod: Update sdk-go
All checks were successful
DCO action / DCO (pull_request) Successful in 4m8s
Vulncheck / Vulncheck (pull_request) Successful in 4m50s
Build / Build Components (1.21) (pull_request) Successful in 6m29s
Build / Build Components (1.20) (pull_request) Successful in 6m44s
Tests and linters / Tests (1.21) (pull_request) Successful in 7m20s
Tests and linters / Staticcheck (pull_request) Successful in 7m12s
Tests and linters / Lint (pull_request) Successful in 7m37s
Tests and linters / Tests (1.20) (pull_request) Successful in 7m35s
Tests and linters / Tests with -race (pull_request) Successful in 8m37s
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-09-15 16:54:15 +03:00
34 changed files with 50 additions and 1908 deletions

View file

@ -346,7 +346,6 @@ type internals struct {
healthStatus *atomic.Int32 healthStatus *atomic.Int32
// is node under maintenance // is node under maintenance
isMaintenance atomic.Bool isMaintenance atomic.Bool
alreadyBootstraped bool
} }
// starts node's maintenance. // starts node's maintenance.

View file

@ -3,44 +3,22 @@ package main
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/ecdsa"
"crypto/sha256"
"errors"
"fmt"
"strconv"
containerV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc" containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container" containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route"
placementrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route/placement"
loadstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/storage"
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph" containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap" "go.uber.org/zap"
) )
const ( func initContainerService(_ context.Context, c *cfg) {
startEstimationNotifyEvent = "StartEstimation"
stopEstimationNotifyEvent = "StopEstimation"
)
func initContainerService(ctx context.Context, c *cfg) {
// container wrapper that tries to invoke notary // container wrapper that tries to invoke notary
// requests if chain is configured so // requests if chain is configured so
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
@ -52,44 +30,10 @@ func initContainerService(ctx context.Context, c *cfg) {
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc) cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
loadAccumulator := loadstorage.New(loadstorage.Prm{})
loadPlacementBuilder := &loadPlacementBuilder{
log: c.log,
nmSrc: c.netMapSource,
cnrSrc: cnrSrc,
}
routeBuilder := placementrouter.New(placementrouter.Prm{
PlacementBuilder: loadPlacementBuilder,
})
loadRouter := loadroute.New(
loadroute.Prm{
LocalServerInfo: c,
RemoteWriterProvider: &remoteLoadAnnounceProvider{
key: &c.key.PrivateKey,
netmapKeys: c,
clientCache: c.bgClientCache,
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
},
Builder: routeBuilder,
},
loadroute.WithLogger(c.log),
)
setLoadController(ctx, c, loadRouter, loadAccumulator)
server := containerTransportGRPC.New( server := containerTransportGRPC.New(
containerService.NewSignService( containerService.NewSignService(
&c.key.PrivateKey, &c.key.PrivateKey,
&usedSpaceService{ containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
loadWriterProvider: loadRouter,
loadPlacementBuilder: loadPlacementBuilder,
routeBuilder: routeBuilder,
cfg: c,
},
), ),
) )
@ -178,50 +122,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
return cnrRdr, cnrWrt return cnrRdr, cnrWrt
} }
func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
pubKey := c.key.PublicKey().Bytes()
// container wrapper that always sends non-notary
// requests
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
fatalOnErr(err)
resultWriter := &morphLoadWriter{
log: c.log,
cnrMorphClient: wrapperNoNotary,
key: pubKey,
}
localMetrics := &localStorageLoad{
log: c.log,
engine: c.cfgObject.cfgLocalStorage.localStorage,
}
ctrl := loadcontroller.New(
loadcontroller.Prm{
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
LocalAnnouncementTarget: loadRouter,
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
},
loadcontroller.WithLogger(c.log),
)
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
ctrl.Start(ctx, loadcontroller.StartPrm{
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
})
})
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
ctrl.Stop(ctx, loadcontroller.StopPrm{
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
})
})
}
// addContainerNotificationHandler adds handler that will be executed synchronously. // addContainerNotificationHandler adds handler that will be executed synchronously.
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) { func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
typ := event.TypeFromString(sTyp) typ := event.TypeFromString(sTyp)
@ -284,219 +184,6 @@ func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationPar
c.cfgContainer.parsers[typ] = p c.cfgContainer.parsers[typ] = p
} }
type morphLoadWriter struct {
log *logger.Logger
cnrMorphClient *cntClient.Client
key []byte
}
func (w *morphLoadWriter) Put(a containerSDK.SizeEstimation) error {
w.log.Debug(logs.FrostFSNodeSaveUsedSpaceAnnouncementInContract,
zap.Uint64("epoch", a.Epoch()),
zap.Stringer("cid", a.Container()),
zap.Uint64("size", a.Value()),
)
prm := cntClient.AnnounceLoadPrm{}
prm.SetAnnouncement(a)
prm.SetReporter(w.key)
return w.cnrMorphClient.AnnounceLoad(prm)
}
func (*morphLoadWriter) Close(context.Context) error {
return nil
}
type nopLoadWriter struct{}
func (nopLoadWriter) Put(containerSDK.SizeEstimation) error {
return nil
}
func (nopLoadWriter) Close(context.Context) error {
return nil
}
type remoteLoadAnnounceProvider struct {
key *ecdsa.PrivateKey
netmapKeys netmapCore.AnnouncedKeys
clientCache interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
deadEndProvider loadcontroller.WriterProvider
}
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) {
if srv == nil {
return r.deadEndProvider, nil
}
if r.netmapKeys.IsLocalKey(srv.PublicKey()) {
// if local => return no-op writer
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
}
var info client.NodeInfo
err := client.NodeInfoFromRawNetmapElement(&info, srv)
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := r.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
return &remoteLoadAnnounceWriterProvider{
client: c,
}, nil
}
type remoteLoadAnnounceWriterProvider struct {
client client.Client
}
func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
return &remoteLoadAnnounceWriter{
client: p.client,
}, nil
}
type remoteLoadAnnounceWriter struct {
client client.Client
buf []containerSDK.SizeEstimation
}
func (r *remoteLoadAnnounceWriter) Put(a containerSDK.SizeEstimation) error {
r.buf = append(r.buf, a)
return nil
}
func (r *remoteLoadAnnounceWriter) Close(ctx context.Context) error {
cliPrm := apiClient.PrmAnnounceSpace{
Announcements: r.buf,
}
_, err := r.client.ContainerAnnounceUsedSpace(ctx, cliPrm)
return err
}
type loadPlacementBuilder struct {
log *logger.Logger
nmSrc netmapCore.Source
cnrSrc containerCore.Source
}
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) {
cnrNodes, nm, err := l.buildPlacement(epoch, cnr)
if err != nil {
return nil, err
}
const pivotPrefix = "load_announcement_"
pivot := []byte(
pivotPrefix + strconv.FormatUint(epoch, 10),
)
placement, err := nm.PlacementVectors(cnrNodes, pivot)
if err != nil {
return nil, fmt.Errorf("could not build placement vectors: %w", err)
}
return placement, nil
}
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) ([][]netmap.NodeInfo, *netmap.NetMap, error) {
cnr, err := l.cnrSrc.Get(idCnr)
if err != nil {
return nil, nil, err
}
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, nil, fmt.Errorf("could not get network map: %w", err)
}
binCnr := make([]byte, sha256.Size)
idCnr.Encode(binCnr)
cnrNodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
}
return cnrNodes, nm, nil
}
type localStorageLoad struct {
log *logger.Logger
engine *engine.StorageEngine
}
func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
idList, err := engine.ListContainers(context.TODO(), d.engine)
if err != nil {
return fmt.Errorf("list containers on engine failure: %w", err)
}
for i := range idList {
sz, err := engine.ContainerSize(d.engine, idList[i])
if err != nil {
d.log.Debug(logs.FrostFSNodeFailedToCalculateContainerSizeInStorageEngine,
zap.Stringer("cid", idList[i]),
zap.String("error", err.Error()),
)
continue
}
d.log.Debug(logs.FrostFSNodeContainerSizeInStorageEngineCalculatedSuccessfully,
zap.Uint64("size", sz),
zap.Stringer("cid", idList[i]),
)
var a containerSDK.SizeEstimation
a.SetContainer(idList[i])
a.SetValue(sz)
if f != nil && !f(a) {
continue
}
if err := h(a); err != nil {
return err
}
}
return nil
}
type usedSpaceService struct {
containerService.Server
loadWriterProvider loadcontroller.WriterProvider
loadPlacementBuilder *loadPlacementBuilder
routeBuilder loadroute.Builder
cfg *cfg
}
func (c *cfg) PublicKey() []byte { func (c *cfg) PublicKey() []byte {
return nodeKeyFromNetmap(c) return nodeKeyFromNetmap(c)
} }
@ -517,125 +204,6 @@ func (c *cfg) ExternalAddresses() []string {
return c.cfgNodeInfo.localInfo.ExternalAddresses() return c.cfgNodeInfo.localInfo.ExternalAddresses()
} }
func (c *usedSpaceService) PublicKey() []byte {
return nodeKeyFromNetmap(c.cfg)
}
func (c *usedSpaceService) IterateAddresses(f func(string) bool) {
c.cfg.iterateNetworkAddresses(f)
}
func (c *usedSpaceService) NumberOfAddresses() int {
return c.cfg.addressNum()
}
func (c *usedSpaceService) ExternalAddresses() []string {
return c.cfg.ExternalAddresses()
}
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
var passedRoute []loadcontroller.ServerInfo
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
key: hdr.GetBodySignature().GetKey(),
})
}
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
}
passedRoute = append(passedRoute, c)
w, err := c.loadWriterProvider.InitWriter(passedRoute)
if err != nil {
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
}
var est containerSDK.SizeEstimation
for _, aV2 := range req.GetBody().GetAnnouncements() {
err = est.ReadFromV2(aV2)
if err != nil {
return nil, fmt.Errorf("invalid size announcement: %w", err)
}
if err := c.processLoadValue(ctx, est, passedRoute, w); err != nil {
return nil, err
}
}
respBody := new(containerV2.AnnounceUsedSpaceResponseBody)
resp := new(containerV2.AnnounceUsedSpaceResponse)
resp.SetBody(respBody)
c.cfg.respSvc.SetMeta(resp)
return resp, nil
}
var errNodeOutsideContainer = errors.New("node outside the container")
type containerOnlyKeyRemoteServerInfo struct {
key []byte
}
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
return i.key
}
func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
}
func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
return 0
}
func (*containerOnlyKeyRemoteServerInfo) ExternalAddresses() []string {
return nil
}
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, key []byte) (bool, error) {
cnrNodes, _, err := l.buildPlacement(epoch, cnr)
if err != nil {
return false, err
}
for i := range cnrNodes {
for j := range cnrNodes[i] {
if bytes.Equal(cnrNodes[i][j].PublicKey(), key) {
return true, nil
}
}
}
return false, nil
}
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
route []loadcontroller.ServerInfo, w loadcontroller.Writer) error {
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
if err != nil {
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
} else if !fromCnr {
return errNodeOutsideContainer
}
err = loadroute.CheckRoute(c.routeBuilder, a, route)
if err != nil {
return fmt.Errorf("wrong route of container's used space value: %w", err)
}
err = w.Put(a)
if err != nil {
return fmt.Errorf("could not write container's used space value: %w", err)
}
return nil
}
// implements interface required by container service provided by morph executor. // implements interface required by container service provided by morph executor.
type morphContainerReader struct { type morphContainerReader struct {
eacl containerCore.EACLSource eacl containerCore.EACLSource

View file

@ -179,16 +179,9 @@ func addNewEpochNotificationHandlers(c *cfg) {
return return
} }
n := ev.(netmapEvent.NewEpoch).EpochNumber() if err := c.bootstrap(); err != nil {
const reBootstrapInterval = 2
if (n-c.cfgNetmap.startEpoch)%reBootstrapInterval == 0 {
err := c.bootstrap()
if err != nil {
c.log.Warn(logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err)) c.log.Warn(logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
} }
}
}) })
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
@ -227,10 +220,6 @@ func bootstrapNode(c *cfg) {
c.log.Info(logs.FrostFSNodeNodeIsUnderMaintenanceSkipInitialBootstrap) c.log.Info(logs.FrostFSNodeNodeIsUnderMaintenanceSkipInitialBootstrap)
return return
} }
if c.alreadyBootstraped {
c.log.Info(logs.NetmapNodeAlreadyInCandidateListOnlineSkipInitialBootstrap)
return
}
err := c.bootstrap() err := c.bootstrap()
fatalOnErrDetails("bootstrap error", err) fatalOnErrDetails("bootstrap error", err)
} }
@ -263,7 +252,7 @@ func initNetmapState(c *cfg) {
fatalOnErrDetails("could not initialize current epoch number", err) fatalOnErrDetails("could not initialize current epoch number", err)
var ni *netmapSDK.NodeInfo var ni *netmapSDK.NodeInfo
ni, c.alreadyBootstraped, err = c.netmapInitLocalNodeState(epoch) ni, err = c.netmapInitLocalNodeState(epoch)
fatalOnErrDetails("could not init network state", err) fatalOnErrDetails("could not init network state", err)
stateWord := nodeState(ni) stateWord := nodeState(ni)
@ -282,13 +271,6 @@ func initNetmapState(c *cfg) {
c.handleLocalNodeInfo(ni) c.handleLocalNodeInfo(ni)
} }
func sameNodeInfo(a, b *netmapSDK.NodeInfo) bool {
// Suboptimal, but we do this once on the node startup.
rawA := a.Marshal()
rawB := b.Marshal()
return bytes.Equal(rawA, rawB)
}
func nodeState(ni *netmapSDK.NodeInfo) string { func nodeState(ni *netmapSDK.NodeInfo) string {
if ni != nil { if ni != nil {
switch { switch {
@ -303,29 +285,27 @@ func nodeState(ni *netmapSDK.NodeInfo) string {
return "undefined" return "undefined"
} }
func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool, error) { func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
nmNodes, err := c.cfgNetmap.wrapper.GetCandidates() nmNodes, err := c.cfgNetmap.wrapper.GetCandidates()
if err != nil { if err != nil {
return nil, false, err return nil, err
} }
var candidate *netmapSDK.NodeInfo var candidate *netmapSDK.NodeInfo
alreadyBootstraped := false
for i := range nmNodes { for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) { if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
candidate = &nmNodes[i] candidate = &nmNodes[i]
alreadyBootstraped = candidate.IsOnline() && sameNodeInfo(&c.cfgNodeInfo.localInfo, candidate)
break break
} }
} }
node, err := c.netmapLocalNodeState(epoch) node, err := c.netmapLocalNodeState(epoch)
if err != nil { if err != nil {
return nil, false, err return nil, err
} }
if candidate == nil { if candidate == nil {
return node, false, nil return node, nil
} }
nmState := nodeState(node) nmState := nodeState(node)
@ -337,7 +317,7 @@ func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool,
zap.String("netmap", nmState), zap.String("netmap", nmState),
zap.String("candidate", candidateState)) zap.String("candidate", candidateState))
} }
return candidate, alreadyBootstraped, nil return candidate, nil
} }
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) { func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {

2
go.mod
View file

@ -6,7 +6,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0 git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230928142024-84b9d29fc98c
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/cheggaaa/pb v1.0.29 github.com/cheggaaa/pb v1.0.29

BIN
go.sum

Binary file not shown.

View file

@ -24,6 +24,8 @@ type (
epochStamp epochStamp
binNodeInfo []byte binNodeInfo []byte
maintenance bool
} }
) )
@ -58,6 +60,7 @@ func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) {
} }
access.binNodeInfo = binNodeInfo access.binNodeInfo = binNodeInfo
access.maintenance = nmNodes[i].IsMaintenance()
newMap[keyString] = access newMap[keyString] = access
} }
@ -105,7 +108,7 @@ func (c *cleanupTable) forEachRemoveCandidate(epoch uint64, f func(string) error
defer c.Unlock() defer c.Unlock()
for keyString, access := range c.lastAccess { for keyString, access := range c.lastAccess {
if epoch-access.epoch > c.threshold { if !access.maintenance && epoch-access.epoch > c.threshold {
access.removeFlag = true // set remove flag access.removeFlag = true // set remove flag
c.lastAccess[keyString] = access c.lastAccess[keyString] = access

View file

@ -124,6 +124,21 @@ func TestCleanupTable(t *testing.T) {
})) }))
require.EqualValues(t, len(infos)-1, cnt) require.EqualValues(t, len(infos)-1, cnt)
}) })
t.Run("skip maintenance nodes", func(t *testing.T) {
cnt := 0
infos[1].SetMaintenance()
key := netmap.StringifyPublicKey(infos[1])
c.update(networkMap, 5)
require.NoError(t,
c.forEachRemoveCandidate(5, func(s string) error {
cnt++
require.NotEqual(t, s, key)
return nil
}))
require.EqualValues(t, len(infos)-1, cnt)
})
}) })
} }

View file

@ -68,16 +68,10 @@ func (m *writeCacheMetrics) Get(d time.Duration, success bool, st writecache.Sto
func (m *writeCacheMetrics) Delete(d time.Duration, success bool, st writecache.StorageType) { func (m *writeCacheMetrics) Delete(d time.Duration, success bool, st writecache.StorageType) {
m.metrics.AddMethodDuration(m.shardID, "Delete", success, d, st.String()) m.metrics.AddMethodDuration(m.shardID, "Delete", success, d, st.String())
if success {
m.metrics.DecActualCount(m.shardID, st.String())
}
} }
func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.StorageType) { func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.StorageType) {
m.metrics.AddMethodDuration(m.shardID, "Put", success, d, st.String()) m.metrics.AddMethodDuration(m.shardID, "Put", success, d, st.String())
if success {
m.metrics.IncActualCount(m.shardID, st.String())
}
} }
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) {
@ -99,7 +93,6 @@ func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) {
} }
func (m *writeCacheMetrics) Evict(st writecache.StorageType) { func (m *writeCacheMetrics) Evict(st writecache.StorageType) {
m.metrics.DecActualCount(m.shardID, st.String())
m.metrics.IncOperationCounter(m.shardID, "Evict", metrics.NullBool{}, st.String()) m.metrics.IncOperationCounter(m.shardID, "Evict", metrics.NullBool{}, st.String())
} }

View file

@ -59,7 +59,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
storagelog.OpField("db DELETE"), storagelog.OpField("db DELETE"),
) )
deleted = true deleted = true
c.objCounters.DecDB() c.decDB()
} }
return err return err
} }

View file

@ -76,7 +76,7 @@ func (c *cache) put(obj objectInfo) error {
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db PUT"), storagelog.OpField("db PUT"),
) )
c.objCounters.IncDB() c.incDB()
} }
return err return err
} }

View file

@ -55,3 +55,13 @@ func (c *cache) initCounters() error {
return nil return nil
} }
func (c *cache) incDB() {
c.objCounters.IncDB()
c.metrics.SetActualCounters(c.objCounters.DB(), 0)
}
func (c *cache) decDB() {
c.objCounters.DecDB()
c.metrics.SetActualCounters(c.objCounters.DB(), 0)
}

View file

@ -73,7 +73,7 @@ func (c *cache) deleteFromDB(keys []internalKey) []internalKey {
} }
for i := 0; i < errorIndex; i++ { for i := 0; i < errorIndex; i++ {
c.objCounters.DecDB() c.decDB()
c.metrics.Evict(writecache.StorageTypeDB) c.metrics.Evict(writecache.StorageTypeDB)
storagelog.Write(c.log, storagelog.Write(c.log,
storagelog.AddressField(keys[i]), storagelog.AddressField(keys[i]),

View file

@ -83,6 +83,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
storagelog.OpField("fstree DELETE"), storagelog.OpField("fstree DELETE"),
) )
deleted = true deleted = true
// counter changed by fstree
c.estimateCacheSize() c.estimateCacheSize()
} }
return metaerr.Wrap(err) return metaerr.Wrap(err)

View file

@ -70,6 +70,7 @@ func (c *cache) runFlushLoop() {
case <-tt.C: case <-tt.C:
c.flushSmallObjects() c.flushSmallObjects()
tt.Reset(defaultFlushInterval) tt.Reset(defaultFlushInterval)
c.estimateCacheSize()
case <-c.closeCh: case <-c.closeCh:
return return
} }

View file

@ -131,6 +131,7 @@ func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) erro
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree PUT"), storagelog.OpField("fstree PUT"),
) )
// counter changed by fstree
c.estimateCacheSize() c.estimateCacheSize()
return nil return nil

View file

@ -72,5 +72,6 @@ func (c *cache) initCounters() error {
return fmt.Errorf("could not read write-cache DB counter: %w", err) return fmt.Errorf("could not read write-cache DB counter: %w", err)
} }
c.objCounters.cDB.Store(inDB) c.objCounters.cDB.Store(inDB)
c.estimateCacheSize()
return nil return nil
} }

View file

@ -73,7 +73,7 @@ func (c *cache) deleteFromDB(key string) {
err := c.db.Batch(func(tx *bbolt.Tx) error { err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
key := []byte(key) key := []byte(key)
recordDeleted = !recordDeleted && b.Get(key) != nil recordDeleted = b.Get(key) != nil
return b.Delete(key) return b.Delete(key)
}) })
@ -122,6 +122,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
storagelog.OpField("fstree DELETE"), storagelog.OpField("fstree DELETE"),
) )
c.metrics.Evict(writecache.StorageTypeFSTree) c.metrics.Evict(writecache.StorageTypeFSTree)
// counter changed by fstree
c.estimateCacheSize() c.estimateCacheSize()
} }
} }

View file

@ -10,16 +10,10 @@ import (
type WriteCacheMetrics interface { type WriteCacheMetrics interface {
AddMethodDuration(shardID string, method string, success bool, d time.Duration, storageType string) AddMethodDuration(shardID string, method string, success bool, d time.Duration, storageType string)
IncActualCount(shardID string, storageType string)
DecActualCount(shardID string, storageType string)
SetActualCount(shardID string, count uint64, storageType string) SetActualCount(shardID string, count uint64, storageType string)
SetEstimateSize(shardID string, size uint64, storageType string) SetEstimateSize(shardID string, size uint64, storageType string)
SetMode(shardID string, mode string) SetMode(shardID string, mode string)
IncOperationCounter(shardID string, operation string, success NullBool, storageType string) IncOperationCounter(shardID string, operation string, success NullBool, storageType string)
Close(shardID string) Close(shardID string)
} }
@ -65,20 +59,6 @@ func (m *writeCacheMetrics) AddMethodDuration(shardID string, method string, suc
).Observe(d.Seconds()) ).Observe(d.Seconds())
} }
func (m *writeCacheMetrics) IncActualCount(shardID string, storageType string) {
m.actualCount.With(prometheus.Labels{
shardIDLabel: shardID,
storageLabel: storageType,
}).Inc()
}
func (m *writeCacheMetrics) DecActualCount(shardID string, storageType string) {
m.actualCount.With(prometheus.Labels{
shardIDLabel: shardID,
storageLabel: storageType,
}).Dec()
}
func (m *writeCacheMetrics) SetActualCount(shardID string, count uint64, storageType string) { func (m *writeCacheMetrics) SetActualCount(shardID string, count uint64, storageType string) {
m.actualCount.With(prometheus.Labels{ m.actualCount.With(prometheus.Labels{
shardIDLabel: shardID, shardIDLabel: shardID,

View file

@ -1,307 +0,0 @@
package loadcontroller
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"go.uber.org/zap"
)
// StartPrm groups the required parameters of the Controller.Start method.
type StartPrm struct {
// Epoch number by which you want to select
// the values of the used space of containers.
Epoch uint64
}
type commonContext struct {
epoch uint64
ctrl *Controller
log *logger.Logger
}
type announcer struct {
commonContext
}
// Start starts the processing of container.SizeEstimation values.
//
// Single Start operation overtakes all data from LocalMetrics to
// LocalAnnouncementTarget (Controller's parameters).
// No filter by epoch is used for the iterator, since it is expected
// that the source of metrics does not track the change of epochs.
//
// Each call acquires an announcement context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Start(ctx context.Context, prm StartPrm) {
var announcer *announcer
// acquire announcement
ctx, announcer = c.acquireAnnouncement(ctx, prm)
if announcer == nil {
return
}
// finally stop and free the announcement
defer announcer.freeAnnouncement()
// announce local values
announcer.announce(ctx)
}
func (c *announcer) announce(ctx context.Context) {
c.log.Debug(logs.ControllerStartingToAnnounceTheValuesOfTheMetrics)
var (
metricsIterator Iterator
err error
)
// initialize iterator over locally collected metrics
metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator()
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyCollectedMetrics,
zap.String("error", err.Error()),
)
return
}
// initialize target of local announcements
targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeAnnouncementAccumulator,
zap.String("error", err.Error()),
)
return
}
// iterate over all collected metrics and write them to the target
err = metricsIterator.Iterate(
func(container.SizeEstimation) bool {
return true // local metrics don't know about epochs
},
func(a container.SizeEstimation) error {
a.SetEpoch(c.epoch) // set epoch explicitly
return targetWriter.Put(a)
},
)
if err != nil {
c.log.Debug(logs.ControllerIteratorOverLocallyCollectedMetricsAborted,
zap.String("error", err.Error()),
)
return
}
// finish writing
err = targetWriter.Close(ctx)
if err != nil {
c.log.Debug(logs.ControllerCouldNotFinishWritingLocalAnnouncements,
zap.String("error", err.Error()),
)
return
}
c.log.Debug(logs.ControllerTrustAnnouncementSuccessfullyFinished)
}
func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) {
started := true
c.announceMtx.Lock()
{
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(ctx)
c.mAnnounceCtx[prm.Epoch] = cancel
started = false
}
}
c.announceMtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)}
if started {
log.Debug(logs.ControllerAnnouncementIsAlreadyStarted)
return ctx, nil
}
return ctx, &announcer{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
},
}
}
func (c *commonContext) freeAnnouncement() {
var stopped bool
c.ctrl.announceMtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch]
if stopped {
cancel()
delete(c.ctrl.mAnnounceCtx, c.epoch)
}
}
c.ctrl.announceMtx.Unlock()
if stopped {
c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted)
} else {
c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted)
}
}
// StopPrm groups the required parameters of the Controller.Stop method.
type StopPrm struct {
// Epoch number the analysis of the values of which must be interrupted.
Epoch uint64
}
type reporter struct {
commonContext
}
// Stop interrupts the processing of container.SizeEstimation values.
//
// Single Stop operation releases an announcement context and overtakes
// all data from AnnouncementAccumulator to ResultReceiver (Controller's
// parameters). Only values for the specified Epoch parameter are processed.
//
// Each call acquires a report context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Stop(ctx context.Context, prm StopPrm) {
var reporter *reporter
ctx, reporter = c.acquireReport(ctx, prm)
if reporter == nil {
return
}
// finally stop and free reporting
defer reporter.freeReport()
// interrupt announcement
reporter.freeAnnouncement()
// report the estimations
reporter.report(ctx)
}
func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) {
started := true
c.reportMtx.Lock()
{
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
ctx, cancel = context.WithCancel(ctx)
c.mReportCtx[prm.Epoch] = cancel
started = false
}
}
c.reportMtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", prm.Epoch),
)}
if started {
log.Debug(logs.ControllerReportIsAlreadyStarted)
return ctx, nil
}
return ctx, &reporter{
commonContext: commonContext{
epoch: prm.Epoch,
ctrl: c,
log: log,
},
}
}
func (c *commonContext) freeReport() {
var stopped bool
c.ctrl.reportMtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.ctrl.mReportCtx[c.epoch]
if stopped {
cancel()
delete(c.ctrl.mReportCtx, c.epoch)
}
}
c.ctrl.reportMtx.Unlock()
if stopped {
c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted)
} else {
c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted)
}
}
func (c *reporter) report(ctx context.Context) {
var (
localIterator Iterator
err error
)
// initialize iterator over locally accumulated announcements
localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator()
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyAccumulatedAnnouncements,
zap.String("error", err.Error()),
)
return
}
// initialize final destination of load estimations
resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeResultTarget,
zap.String("error", err.Error()),
)
return
}
// iterate over all accumulated announcements and write them to the target
err = localIterator.Iterate(
usedSpaceFilterEpochEQ(c.epoch),
resultWriter.Put,
)
if err != nil {
c.log.Debug(logs.ControllerIteratorOverLocalAnnouncementsAborted,
zap.String("error", err.Error()),
)
return
}
// finish writing
err = resultWriter.Close(ctx)
if err != nil {
c.log.Debug(logs.ControllerCouldNotFinishWritingLoadEstimations,
zap.String("error", err.Error()),
)
}
}

View file

@ -1,192 +0,0 @@
package loadcontroller_test
import (
"context"
"math/rand"
"sync"
"testing"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
type testAnnouncementStorage struct {
w loadcontroller.Writer
i loadcontroller.Iterator
mtx sync.RWMutex
m map[uint64][]container.SizeEstimation
}
func newTestStorage() *testAnnouncementStorage {
return &testAnnouncementStorage{
m: make(map[uint64][]container.SizeEstimation),
}
}
func (s *testAnnouncementStorage) InitIterator() (loadcontroller.Iterator, error) {
if s.i != nil {
return s.i, nil
}
return s, nil
}
func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
s.mtx.RLock()
defer s.mtx.RUnlock()
for _, v := range s.m {
for _, a := range v {
if f(a) {
if err := h(a); err != nil {
return err
}
}
}
}
return nil
}
func (s *testAnnouncementStorage) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
if s.w != nil {
return s.w, nil
}
return s, nil
}
func (s *testAnnouncementStorage) Put(v container.SizeEstimation) error {
s.mtx.Lock()
s.m[v.Epoch()] = append(s.m[v.Epoch()], v)
s.mtx.Unlock()
return nil
}
func (s *testAnnouncementStorage) Close(context.Context) error {
return nil
}
func randAnnouncement() (a container.SizeEstimation) {
a.SetContainer(cidtest.ID())
a.SetValue(rand.Uint64())
return
}
func TestSimpleScenario(t *testing.T) {
// create storage to write final estimations
resultStorage := newTestStorage()
// create storages to accumulate announcements
accumulatingStorageN2 := newTestStorage()
// create storage of local metrics
localStorageN1 := newTestStorage()
localStorageN2 := newTestStorage()
// create 2 controllers: 1st writes announcements to 2nd, 2nd directly to final destination
ctrlN1 := loadcontroller.New(loadcontroller.Prm{
LocalMetrics: localStorageN1,
AnnouncementAccumulator: newTestStorage(),
LocalAnnouncementTarget: &testAnnouncementStorage{
w: accumulatingStorageN2,
},
ResultReceiver: resultStorage,
})
ctrlN2 := loadcontroller.New(loadcontroller.Prm{
LocalMetrics: localStorageN2,
AnnouncementAccumulator: accumulatingStorageN2,
LocalAnnouncementTarget: &testAnnouncementStorage{
w: resultStorage,
},
ResultReceiver: resultStorage,
})
const processEpoch uint64 = 10
const goodNum = 4
// create 2 random values for processing epoch and 1 for some different
announces := make([]container.SizeEstimation, 0, goodNum)
for i := 0; i < goodNum; i++ {
a := randAnnouncement()
a.SetEpoch(processEpoch)
announces = append(announces, a)
}
// store one half of "good" announcements to 1st metrics storage, another - to 2nd
// and "bad" to both
for i := 0; i < goodNum/2; i++ {
require.NoError(t, localStorageN1.Put(announces[i]))
}
for i := goodNum / 2; i < goodNum; i++ {
require.NoError(t, localStorageN2.Put(announces[i]))
}
wg := new(sync.WaitGroup)
wg.Add(2)
startPrm := loadcontroller.StartPrm{
Epoch: processEpoch,
}
// start both controllers
go func() {
ctrlN1.Start(context.Background(), startPrm)
wg.Done()
}()
go func() {
ctrlN2.Start(context.Background(), startPrm)
wg.Done()
}()
wg.Wait()
wg.Add(2)
stopPrm := loadcontroller.StopPrm{
Epoch: processEpoch,
}
// stop both controllers
go func() {
ctrlN1.Stop(context.Background(), stopPrm)
wg.Done()
}()
go func() {
ctrlN2.Stop(context.Background(), stopPrm)
wg.Done()
}()
wg.Wait()
// result target should contain all "good" announcements and shoult not container the "bad" one
var res []container.SizeEstimation
err := resultStorage.Iterate(
func(a container.SizeEstimation) bool {
return true
},
func(a container.SizeEstimation) error {
res = append(res, a)
return nil
},
)
require.NoError(t, err)
for i := range announces {
require.Contains(t, res, announces[i])
}
}

View file

@ -1,94 +0,0 @@
package loadcontroller
import (
"context"
"fmt"
"sync"
)
// Prm groups the required parameters of the Controller's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Iterator over the used space values of the containers
// collected by the node locally.
LocalMetrics IteratorProvider
// Place of recording the local values of
// the used space of containers.
LocalAnnouncementTarget WriterProvider
// Iterator over the summarized used space scores
// from the various network participants.
AnnouncementAccumulator IteratorProvider
// Place of recording the final estimates of
// the used space of containers.
ResultReceiver WriterProvider
}
// Controller represents main handler for starting
// and interrupting container volume estimation.
//
// It binds the interfaces of the local value stores
// to the target storage points. Controller is abstracted
// from the internal storage device and the network location
// of the connecting components. At its core, it is a
// high-level start-stop trigger for calculations.
//
// For correct operation, the controller must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the constructor is immediately ready to work through
// API of external control of calculations and data transfer.
type Controller struct {
prm Prm
opts *options
announceMtx sync.Mutex
mAnnounceCtx map[uint64]context.CancelFunc
reportMtx sync.Mutex
mReportCtx map[uint64]context.CancelFunc
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Controller.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Controller does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Controller {
switch {
case prm.LocalMetrics == nil:
panicOnPrmValue("LocalMetrics", prm.LocalMetrics)
case prm.AnnouncementAccumulator == nil:
panicOnPrmValue("AnnouncementAccumulator", prm.AnnouncementAccumulator)
case prm.LocalAnnouncementTarget == nil:
panicOnPrmValue("LocalAnnouncementTarget", prm.LocalAnnouncementTarget)
case prm.ResultReceiver == nil:
panicOnPrmValue("ResultReceiver", prm.ResultReceiver)
}
o := defaultOpts()
for _, opt := range opts {
opt(o)
}
return &Controller{
prm: prm,
opts: o,
mAnnounceCtx: make(map[uint64]context.CancelFunc),
mReportCtx: make(map[uint64]context.CancelFunc),
}
}

View file

@ -1,103 +0,0 @@
package loadcontroller
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// UsedSpaceHandler describes the signature of the container.SizeEstimation
// value handling function.
//
// Termination of processing without failures is usually signaled
// with a zero error, while a specific value may describe the reason
// for failure.
type UsedSpaceHandler func(container.SizeEstimation) error
// UsedSpaceFilter describes the signature of the function for
// checking whether a value meets a certain criterion.
//
// Return of true means conformity, false - vice versa.
type UsedSpaceFilter func(container.SizeEstimation) bool
// Iterator is a group of methods provided by entity
// which can iterate over a group of container.SizeEstimation values.
type Iterator interface {
// Iterate must start an iterator over values that
// meet the filter criterion (returns true).
// For each such value should call a handler, the error
// of which should be directly returned from the method.
//
// Internal failures of the iterator are also signaled via
// an error. After a successful call to the last value
// handler, nil should be returned.
Iterate(UsedSpaceFilter, UsedSpaceHandler) error
}
// IteratorProvider is a group of methods provided
// by entity which generates iterators over
// container.SizeEstimation values.
type IteratorProvider interface {
// InitIterator should return an initialized Iterator.
//
// Initialization problems are reported via error.
// If no error was returned, then the Iterator must not be nil.
//
// Implementations can have different logic for different
// contexts, so specific ones may document their own behavior.
InitIterator() (Iterator, error)
}
// Writer describes the interface for storing container.SizeEstimation values.
//
// This interface is provided by both local storage
// of values and remote (wrappers over the RPC).
type Writer interface {
// Put performs a write operation of container.SizeEstimation value
// and returns any error encountered.
//
// All values after the Close call must be flushed to the
// physical target. Implementations can cache values before
// Close operation.
//
// Put must not be called after Close.
Put(container.SizeEstimation) error
// Close exits with method-providing Writer.
//
// All cached values must be flushed before
// the Close's return.
//
// Methods must not be called after Close.
Close(ctx context.Context) error
}
// WriterProvider is a group of methods provided
// by entity which generates keepers of
// container.SizeEstimation values.
type WriterProvider interface {
// InitWriter should return an initialized Writer.
//
// Initialization problems are reported via error.
// If no error was returned, then the Writer must not be nil.
InitWriter(route []ServerInfo) (Writer, error)
}
// ServerInfo describes a set of
// characteristics of a point in a route.
type ServerInfo interface {
// PublicKey returns public key of the node
// from the route in a binary representation.
PublicKey() []byte
// Iterates over network addresses of the node
// in the route. Breaks iterating on true return
// of the handler.
IterateAddresses(func(string) bool)
// Returns number of server's network addresses.
NumberOfAddresses() int
// ExternalAddresses returns external node's addresses.
ExternalAddresses() []string
}

View file

@ -1,28 +0,0 @@
package loadcontroller
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Controller.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns option to specify logging component.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,36 +0,0 @@
package loadcontroller
import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter {
return func(a container.SizeEstimation) bool {
return a.Epoch() == epoch
}
}
type storageWrapper struct {
w Writer
i Iterator
}
func (s storageWrapper) InitIterator() (Iterator, error) {
return s.i, nil
}
func (s storageWrapper) InitWriter([]ServerInfo) (Writer, error) {
return s.w, nil
}
func SimpleIteratorProvider(i Iterator) IteratorProvider {
return &storageWrapper{
i: i,
}
}
func SimpleWriterProvider(w Writer) WriterProvider {
return &storageWrapper{
w: w,
}
}

View file

@ -1,145 +0,0 @@
package loadroute
import (
"context"
"encoding/hex"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"go.uber.org/zap"
)
// InitWriter initializes and returns Writer that sends each value to its next route point.
//
// If route is present, then it is taken into account,
// and the value will be sent to its continuation. Otherwise, the route will be laid
// from scratch and the value will be sent to its primary point.
//
// After building a list of remote points of the next leg of the route, the value is sent
// sequentially to all of them. If any transmissions (even all) fail, an error will not
// be returned.
//
// Close of the composed Writer calls Close method on each internal Writer generated in
// runtime and never returns an error.
//
// Always returns nil error.
func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
if len(route) == 0 {
route = []loadcontroller.ServerInfo{r.localSrvInfo}
}
return &loadWriter{
router: r,
route: route,
mRoute: make(map[routeKey]*valuesRoute),
mServers: make(map[string]loadcontroller.Writer),
}, nil
}
type routeKey struct {
epoch uint64
cid string
}
type valuesRoute struct {
route []loadcontroller.ServerInfo
values []container.SizeEstimation
}
type loadWriter struct {
router *Router
route []loadcontroller.ServerInfo
routeMtx sync.RWMutex
mRoute map[routeKey]*valuesRoute
mServers map[string]loadcontroller.Writer
}
func (w *loadWriter) Put(a container.SizeEstimation) error {
w.routeMtx.Lock()
defer w.routeMtx.Unlock()
key := routeKey{
epoch: a.Epoch(),
cid: a.Container().EncodeToString(),
}
routeValues, ok := w.mRoute[key]
if !ok {
route, err := w.router.routeBuilder.NextStage(a, w.route)
if err != nil {
return err
} else if len(route) == 0 {
route = []loadcontroller.ServerInfo{nil}
}
routeValues = &valuesRoute{
route: route,
values: []container.SizeEstimation{a},
}
w.mRoute[key] = routeValues
}
for _, remoteInfo := range routeValues.route {
var key string
if remoteInfo != nil {
key = hex.EncodeToString(remoteInfo.PublicKey())
}
remoteWriter, ok := w.mServers[key]
if !ok {
provider, err := w.router.remoteProvider.InitRemote(remoteInfo)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotInitializeWriterProvider,
zap.String("error", err.Error()),
)
continue // best effort
}
remoteWriter, err = provider.InitWriter(w.route)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotInitializeWriter,
zap.String("error", err.Error()),
)
continue // best effort
}
w.mServers[key] = remoteWriter
}
err := remoteWriter.Put(a)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotPutTheValue,
zap.String("error", err.Error()),
)
}
// continue best effort
}
return nil
}
func (w *loadWriter) Close(ctx context.Context) error {
for key, wRemote := range w.mServers {
err := wRemote.Close(ctx)
if err != nil {
w.router.log.Debug(logs.RouteCouldNotCloseRemoteServerWriter,
zap.String("key", key),
zap.String("error", err.Error()),
)
}
}
return nil
}

View file

@ -1,31 +0,0 @@
package loadroute
import (
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// Builder groups methods to route values in the network.
type Builder interface {
// NextStage must return next group of route points for the value a
// based on the passed route.
//
// Empty passed list means being at the starting point of the route.
//
// Must return empty list and no error if the endpoint of the route is reached.
// If there are more than one point to go and the last passed point is included
// in that list (means that point is the last point in one of the route groups),
// returned route must contain nil point that should be interpreted as signal to,
// among sending to other route points, save the announcement in that point.
NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error)
}
// RemoteWriterProvider describes the component
// for sending values to a fixed route point.
type RemoteWriterProvider interface {
// InitRemote must return WriterProvider to the route point
// corresponding to info.
//
// Nil info matches the end of the route.
InitRemote(info loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error)
}

View file

@ -1,28 +0,0 @@
package loadroute
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Router.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns Option to specify logging component.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,49 +0,0 @@
package placementrouter
import "fmt"
// Prm groups the required parameters of the Builder's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Calculator of the container members.
//
// Must not be nil.
PlacementBuilder PlacementBuilder
}
// Builder represents component that routes used container space
// values between nodes from the container.
//
// For correct operation, Builder must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Builder is immediately ready to work through API.
type Builder struct {
placementBuilder PlacementBuilder
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Builder.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Builder does not require additional
// initialization and is completely ready for work.
func New(prm Prm) *Builder {
switch {
case prm.PlacementBuilder == nil:
panicOnPrmValue("PlacementBuilder", prm.PlacementBuilder)
}
return &Builder{
placementBuilder: prm.PlacementBuilder,
}
}

View file

@ -1,47 +0,0 @@
package placementrouter
import (
"bytes"
"fmt"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
// NextStage composes container nodes for the container and epoch from a,
// and returns the list of nodes with maximum weight (one from each vector).
//
// If passed route has more than one point, then endpoint of the route is reached.
//
// The traversed route is not checked, it is assumed to be correct.
func (b *Builder) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) {
if len(passed) > 1 {
return nil, nil
}
cnr := a.Container()
placement, err := b.placementBuilder.BuildPlacement(a.Epoch(), cnr)
if err != nil {
return nil, fmt.Errorf("could not build placement %s: %w", cnr, err)
}
res := make([]loadcontroller.ServerInfo, 0, len(placement))
for i := range placement {
if len(placement[i]) == 0 {
continue
}
if len(passed) == 1 && bytes.Equal(passed[0].PublicKey(), placement[i][0].PublicKey()) {
// add nil element so the announcement will be saved in local memory
res = append(res, nil)
} else {
// add element with remote node to send announcement to
res = append(res, netmapcore.Node(placement[i][0]))
}
}
return res, nil
}

View file

@ -1,14 +0,0 @@
package placementrouter
import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
// PlacementBuilder describes interface of FrostFS placement calculator.
type PlacementBuilder interface {
// BuildPlacement must compose and sort (according to a specific algorithm)
// storage nodes from the container by its identifier using network map
// of particular epoch.
BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error)
}

View file

@ -1,87 +0,0 @@
package loadroute
import (
"fmt"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
// Prm groups the required parameters of the Router's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Characteristics of the local node's server.
//
// Must not be nil.
LocalServerInfo loadcontroller.ServerInfo
// Component for sending values to a fixed route point.
//
// Must not be nil.
RemoteWriterProvider RemoteWriterProvider
// Route planner.
//
// Must not be nil.
Builder Builder
}
// Router represents component responsible for routing
// used container space values over the network.
//
// For each fixed pair (container ID, epoch) there is a
// single value route on the network. Router provides the
// interface for writing values to the next point of the route.
//
// For correct operation, Router must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Router is immediately ready to work through API.
type Router struct {
log *logger.Logger
remoteProvider RemoteWriterProvider
routeBuilder Builder
localSrvInfo loadcontroller.ServerInfo
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Router.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Router does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Router {
switch {
case prm.RemoteWriterProvider == nil:
panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider)
case prm.Builder == nil:
panicOnPrmValue("Builder", prm.Builder)
case prm.LocalServerInfo == nil:
panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo)
}
o := defaultOpts()
for i := range opts {
opts[i](o)
}
return &Router{
log: o.log,
remoteProvider: prm.RemoteWriterProvider,
routeBuilder: prm.Builder,
localSrvInfo: prm.LocalServerInfo,
}
}

View file

@ -1,49 +0,0 @@
package loadroute
import (
"bytes"
"errors"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
var errWrongRoute = errors.New("wrong route")
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
//
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
func CheckRoute(builder Builder, a container.SizeEstimation, route []loadcontroller.ServerInfo) error {
for i := 1; i < len(route); i++ {
servers, err := builder.NextStage(a, route[:i])
if err != nil {
return err
} else if len(servers) == 0 {
break
}
found := false
for j := range servers {
if servers[j] == nil {
// nil route point means that
// (i-1)-th node in the route
// must, among other things,
// save the announcement to its
// local memory
continue
}
if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) {
found = true
break
}
}
if !found {
return errWrongRoute
}
}
return nil
}

View file

@ -1,151 +0,0 @@
package loadstorage
import (
"context"
"sort"
"sync"
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
)
type usedSpaceEstimations struct {
announcement container.SizeEstimation
sizes []uint64
}
type storageKey struct {
epoch uint64
cid string
}
// Storage represents in-memory storage of
// container.SizeEstimation values.
//
// The write operation has the usual behavior - to save
// the next number of used container space for a specific epoch.
// All values related to one key (epoch, container ID) are stored
// as a list.
//
// Storage also provides an iterator interface, into the handler
// of which the final score is passed, built on all values saved
// at the time of the call. Currently the only possible estimation
// formula is used - the average between 10th and 90th percentile.
//
// For correct operation, Storage must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// Storage is immediately ready to work through API.
type Storage struct {
mtx sync.RWMutex
mItems map[storageKey]*usedSpaceEstimations
}
// Prm groups the required parameters of the Storage's constructor.
//
// The component is not parameterizable at the moment.
type Prm struct{}
// New creates a new instance of the Storage.
//
// The created Storage does not require additional
// initialization and is completely ready for work.
func New(_ Prm) *Storage {
return &Storage{
mItems: make(map[storageKey]*usedSpaceEstimations),
}
}
// Put appends the next value of the occupied container space for the epoch
// to the list of already saved values.
//
// Always returns nil error.
func (s *Storage) Put(a container.SizeEstimation) error {
s.mtx.Lock()
{
key := storageKey{
epoch: a.Epoch(),
cid: a.Container().EncodeToString(),
}
estimations, ok := s.mItems[key]
if !ok {
estimations = &usedSpaceEstimations{
announcement: a,
sizes: make([]uint64, 0, 1),
}
s.mItems[key] = estimations
}
estimations.sizes = append(estimations.sizes, a.Value())
}
s.mtx.Unlock()
return nil
}
func (s *Storage) Close(context.Context) error {
return nil
}
// Iterate goes through all the lists with the key (container ID, epoch),
// calculates the final grade for all values, and passes it to the handler.
//
// Final grade is the average between 10th and 90th percentiles.
func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) (err error) {
s.mtx.RLock()
{
for _, v := range s.mItems {
if f(v.announcement) {
// calculate estimation based on 90th percentile
v.announcement.SetValue(finalEstimation(v.sizes))
if err = h(v.announcement); err != nil {
break
}
}
}
}
s.mtx.RUnlock()
return
}
func finalEstimation(vals []uint64) uint64 {
sort.Slice(vals, func(i, j int) bool {
return vals[i] < vals[j]
})
const (
lowerRank = 10
upperRank = 90
)
if len(vals) >= lowerRank {
lowerInd := percentile(lowerRank, vals)
upperInd := percentile(upperRank, vals)
vals = vals[lowerInd:upperInd]
}
sum := uint64(0)
for i := range vals {
sum += vals[i]
}
return sum / uint64(len(vals))
}
func percentile(rank int, vals []uint64) int {
p := len(vals) * rank / 100
return p
}

View file

@ -1,50 +0,0 @@
package loadstorage
import (
"math/rand"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
func TestStorage(t *testing.T) {
const epoch uint64 = 13
var a container.SizeEstimation
a.SetContainer(cidtest.ID())
a.SetEpoch(epoch)
const opinionsNum = 100
s := New(Prm{})
opinions := make([]uint64, opinionsNum)
for i := range opinions {
opinions[i] = rand.Uint64()
a.SetValue(opinions[i])
require.NoError(t, s.Put(a))
}
iterCounter := 0
err := s.Iterate(
func(ai container.SizeEstimation) bool {
return ai.Epoch() == epoch
},
func(ai container.SizeEstimation) error {
iterCounter++
require.Equal(t, epoch, ai.Epoch())
require.Equal(t, a.Container(), ai.Container())
require.Equal(t, finalEstimation(opinions), ai.Value())
return nil
},
)
require.NoError(t, err)
require.Equal(t, 1, iterCounter)
}