Compare commits
7 commits
master
...
bugfix/rev
Author | SHA1 | Date | |
---|---|---|---|
1bf57815f3 | |||
5acc13fa94 | |||
e0f0b93b5e | |||
eb5248621a | |||
02be6a4341 | |||
368774be95 | |||
b9ef294b99 |
24 changed files with 362 additions and 2029 deletions
|
@ -345,8 +345,7 @@ type internals struct {
|
|||
apiVersion version.Version
|
||||
healthStatus *atomic.Int32
|
||||
// is node under maintenance
|
||||
isMaintenance atomic.Bool
|
||||
alreadyBootstraped bool
|
||||
isMaintenance atomic.Bool
|
||||
}
|
||||
|
||||
// starts node's maintenance.
|
||||
|
|
|
@ -3,44 +3,22 @@ package main
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
containerV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
containerGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
||||
containerTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/container/grpc"
|
||||
containerService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container"
|
||||
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
loadroute "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route"
|
||||
placementrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/route/placement"
|
||||
loadstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/storage"
|
||||
containerMorph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apiClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
startEstimationNotifyEvent = "StartEstimation"
|
||||
stopEstimationNotifyEvent = "StopEstimation"
|
||||
)
|
||||
|
||||
func initContainerService(ctx context.Context, c *cfg) {
|
||||
func initContainerService(_ context.Context, c *cfg) {
|
||||
// container wrapper that tries to invoke notary
|
||||
// requests if chain is configured so
|
||||
wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary())
|
||||
|
@ -52,44 +30,10 @@ func initContainerService(ctx context.Context, c *cfg) {
|
|||
|
||||
cnrRdr, cnrWrt := configureEACLAndContainerSources(c, wrap, cnrSrc)
|
||||
|
||||
loadAccumulator := loadstorage.New(loadstorage.Prm{})
|
||||
|
||||
loadPlacementBuilder := &loadPlacementBuilder{
|
||||
log: c.log,
|
||||
nmSrc: c.netMapSource,
|
||||
cnrSrc: cnrSrc,
|
||||
}
|
||||
|
||||
routeBuilder := placementrouter.New(placementrouter.Prm{
|
||||
PlacementBuilder: loadPlacementBuilder,
|
||||
})
|
||||
|
||||
loadRouter := loadroute.New(
|
||||
loadroute.Prm{
|
||||
LocalServerInfo: c,
|
||||
RemoteWriterProvider: &remoteLoadAnnounceProvider{
|
||||
key: &c.key.PrivateKey,
|
||||
netmapKeys: c,
|
||||
clientCache: c.bgClientCache,
|
||||
deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator),
|
||||
},
|
||||
Builder: routeBuilder,
|
||||
},
|
||||
loadroute.WithLogger(c.log),
|
||||
)
|
||||
|
||||
setLoadController(ctx, c, loadRouter, loadAccumulator)
|
||||
|
||||
server := containerTransportGRPC.New(
|
||||
containerService.NewSignService(
|
||||
&c.key.PrivateKey,
|
||||
&usedSpaceService{
|
||||
Server: containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
||||
loadWriterProvider: loadRouter,
|
||||
loadPlacementBuilder: loadPlacementBuilder,
|
||||
routeBuilder: routeBuilder,
|
||||
cfg: c,
|
||||
},
|
||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
||||
),
|
||||
)
|
||||
|
||||
|
@ -178,50 +122,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
return cnrRdr, cnrWrt
|
||||
}
|
||||
|
||||
func setLoadController(ctx context.Context, c *cfg, loadRouter *loadroute.Router, loadAccumulator *loadstorage.Storage) {
|
||||
pubKey := c.key.PublicKey().Bytes()
|
||||
|
||||
// container wrapper that always sends non-notary
|
||||
// requests
|
||||
wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
|
||||
fatalOnErr(err)
|
||||
|
||||
resultWriter := &morphLoadWriter{
|
||||
log: c.log,
|
||||
cnrMorphClient: wrapperNoNotary,
|
||||
key: pubKey,
|
||||
}
|
||||
|
||||
localMetrics := &localStorageLoad{
|
||||
log: c.log,
|
||||
engine: c.cfgObject.cfgLocalStorage.localStorage,
|
||||
}
|
||||
|
||||
ctrl := loadcontroller.New(
|
||||
loadcontroller.Prm{
|
||||
LocalMetrics: loadcontroller.SimpleIteratorProvider(localMetrics),
|
||||
AnnouncementAccumulator: loadcontroller.SimpleIteratorProvider(loadAccumulator),
|
||||
LocalAnnouncementTarget: loadRouter,
|
||||
ResultReceiver: loadcontroller.SimpleWriterProvider(resultWriter),
|
||||
},
|
||||
loadcontroller.WithLogger(c.log),
|
||||
)
|
||||
|
||||
setContainerNotificationParser(c, startEstimationNotifyEvent, containerEvent.ParseStartEstimation)
|
||||
addContainerAsyncNotificationHandler(c, startEstimationNotifyEvent, func(ev event.Event) {
|
||||
ctrl.Start(ctx, loadcontroller.StartPrm{
|
||||
Epoch: ev.(containerEvent.StartEstimation).Epoch(),
|
||||
})
|
||||
})
|
||||
|
||||
setContainerNotificationParser(c, stopEstimationNotifyEvent, containerEvent.ParseStopEstimation)
|
||||
addContainerAsyncNotificationHandler(c, stopEstimationNotifyEvent, func(ev event.Event) {
|
||||
ctrl.Stop(ctx, loadcontroller.StopPrm{
|
||||
Epoch: ev.(containerEvent.StopEstimation).Epoch(),
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// addContainerNotificationHandler adds handler that will be executed synchronously.
|
||||
func addContainerNotificationHandler(c *cfg, sTyp string, h event.Handler) {
|
||||
typ := event.TypeFromString(sTyp)
|
||||
|
@ -284,219 +184,6 @@ func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationPar
|
|||
c.cfgContainer.parsers[typ] = p
|
||||
}
|
||||
|
||||
type morphLoadWriter struct {
|
||||
log *logger.Logger
|
||||
|
||||
cnrMorphClient *cntClient.Client
|
||||
|
||||
key []byte
|
||||
}
|
||||
|
||||
func (w *morphLoadWriter) Put(a containerSDK.SizeEstimation) error {
|
||||
w.log.Debug(logs.FrostFSNodeSaveUsedSpaceAnnouncementInContract,
|
||||
zap.Uint64("epoch", a.Epoch()),
|
||||
zap.Stringer("cid", a.Container()),
|
||||
zap.Uint64("size", a.Value()),
|
||||
)
|
||||
|
||||
prm := cntClient.AnnounceLoadPrm{}
|
||||
|
||||
prm.SetAnnouncement(a)
|
||||
prm.SetReporter(w.key)
|
||||
|
||||
return w.cnrMorphClient.AnnounceLoad(prm)
|
||||
}
|
||||
|
||||
func (*morphLoadWriter) Close(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type nopLoadWriter struct{}
|
||||
|
||||
func (nopLoadWriter) Put(containerSDK.SizeEstimation) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nopLoadWriter) Close(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type remoteLoadAnnounceProvider struct {
|
||||
key *ecdsa.PrivateKey
|
||||
|
||||
netmapKeys netmapCore.AnnouncedKeys
|
||||
|
||||
clientCache interface {
|
||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
deadEndProvider loadcontroller.WriterProvider
|
||||
}
|
||||
|
||||
func (r *remoteLoadAnnounceProvider) InitRemote(srv loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error) {
|
||||
if srv == nil {
|
||||
return r.deadEndProvider, nil
|
||||
}
|
||||
|
||||
if r.netmapKeys.IsLocalKey(srv.PublicKey()) {
|
||||
// if local => return no-op writer
|
||||
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
|
||||
}
|
||||
|
||||
var info client.NodeInfo
|
||||
|
||||
err := client.NodeInfoFromRawNetmapElement(&info, srv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||
}
|
||||
|
||||
c, err := r.clientCache.Get(info)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
||||
}
|
||||
|
||||
return &remoteLoadAnnounceWriterProvider{
|
||||
client: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type remoteLoadAnnounceWriterProvider struct {
|
||||
client client.Client
|
||||
}
|
||||
|
||||
func (p *remoteLoadAnnounceWriterProvider) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
|
||||
return &remoteLoadAnnounceWriter{
|
||||
client: p.client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type remoteLoadAnnounceWriter struct {
|
||||
client client.Client
|
||||
|
||||
buf []containerSDK.SizeEstimation
|
||||
}
|
||||
|
||||
func (r *remoteLoadAnnounceWriter) Put(a containerSDK.SizeEstimation) error {
|
||||
r.buf = append(r.buf, a)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *remoteLoadAnnounceWriter) Close(ctx context.Context) error {
|
||||
cliPrm := apiClient.PrmAnnounceSpace{
|
||||
Announcements: r.buf,
|
||||
}
|
||||
|
||||
_, err := r.client.ContainerAnnounceUsedSpace(ctx, cliPrm)
|
||||
return err
|
||||
}
|
||||
|
||||
type loadPlacementBuilder struct {
|
||||
log *logger.Logger
|
||||
|
||||
nmSrc netmapCore.Source
|
||||
|
||||
cnrSrc containerCore.Source
|
||||
}
|
||||
|
||||
func (l *loadPlacementBuilder) BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error) {
|
||||
cnrNodes, nm, err := l.buildPlacement(epoch, cnr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
const pivotPrefix = "load_announcement_"
|
||||
|
||||
pivot := []byte(
|
||||
pivotPrefix + strconv.FormatUint(epoch, 10),
|
||||
)
|
||||
|
||||
placement, err := nm.PlacementVectors(cnrNodes, pivot)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not build placement vectors: %w", err)
|
||||
}
|
||||
|
||||
return placement, nil
|
||||
}
|
||||
|
||||
func (l *loadPlacementBuilder) buildPlacement(epoch uint64, idCnr cid.ID) ([][]netmap.NodeInfo, *netmap.NetMap, error) {
|
||||
cnr, err := l.cnrSrc.Get(idCnr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
nm, err := l.nmSrc.GetNetMapByEpoch(epoch)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not get network map: %w", err)
|
||||
}
|
||||
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
idCnr.Encode(binCnr)
|
||||
|
||||
cnrNodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), binCnr)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not build container nodes: %w", err)
|
||||
}
|
||||
|
||||
return cnrNodes, nm, nil
|
||||
}
|
||||
|
||||
type localStorageLoad struct {
|
||||
log *logger.Logger
|
||||
|
||||
engine *engine.StorageEngine
|
||||
}
|
||||
|
||||
func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
|
||||
idList, err := engine.ListContainers(context.TODO(), d.engine)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list containers on engine failure: %w", err)
|
||||
}
|
||||
|
||||
for i := range idList {
|
||||
sz, err := engine.ContainerSize(d.engine, idList[i])
|
||||
if err != nil {
|
||||
d.log.Debug(logs.FrostFSNodeFailedToCalculateContainerSizeInStorageEngine,
|
||||
zap.Stringer("cid", idList[i]),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
d.log.Debug(logs.FrostFSNodeContainerSizeInStorageEngineCalculatedSuccessfully,
|
||||
zap.Uint64("size", sz),
|
||||
zap.Stringer("cid", idList[i]),
|
||||
)
|
||||
|
||||
var a containerSDK.SizeEstimation
|
||||
a.SetContainer(idList[i])
|
||||
a.SetValue(sz)
|
||||
|
||||
if f != nil && !f(a) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := h(a); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type usedSpaceService struct {
|
||||
containerService.Server
|
||||
|
||||
loadWriterProvider loadcontroller.WriterProvider
|
||||
|
||||
loadPlacementBuilder *loadPlacementBuilder
|
||||
|
||||
routeBuilder loadroute.Builder
|
||||
|
||||
cfg *cfg
|
||||
}
|
||||
|
||||
func (c *cfg) PublicKey() []byte {
|
||||
return nodeKeyFromNetmap(c)
|
||||
}
|
||||
|
@ -517,125 +204,6 @@ func (c *cfg) ExternalAddresses() []string {
|
|||
return c.cfgNodeInfo.localInfo.ExternalAddresses()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) PublicKey() []byte {
|
||||
return nodeKeyFromNetmap(c.cfg)
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) IterateAddresses(f func(string) bool) {
|
||||
c.cfg.iterateNetworkAddresses(f)
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) NumberOfAddresses() int {
|
||||
return c.cfg.addressNum()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) ExternalAddresses() []string {
|
||||
return c.cfg.ExternalAddresses()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
|
||||
var passedRoute []loadcontroller.ServerInfo
|
||||
|
||||
for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() {
|
||||
passedRoute = append(passedRoute, &containerOnlyKeyRemoteServerInfo{
|
||||
key: hdr.GetBodySignature().GetKey(),
|
||||
})
|
||||
}
|
||||
|
||||
for left, right := 0, len(passedRoute)-1; left < right; left, right = left+1, right-1 {
|
||||
passedRoute[left], passedRoute[right] = passedRoute[right], passedRoute[left]
|
||||
}
|
||||
|
||||
passedRoute = append(passedRoute, c)
|
||||
|
||||
w, err := c.loadWriterProvider.InitWriter(passedRoute)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize container's used space writer: %w", err)
|
||||
}
|
||||
|
||||
var est containerSDK.SizeEstimation
|
||||
|
||||
for _, aV2 := range req.GetBody().GetAnnouncements() {
|
||||
err = est.ReadFromV2(aV2)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid size announcement: %w", err)
|
||||
}
|
||||
|
||||
if err := c.processLoadValue(ctx, est, passedRoute, w); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
respBody := new(containerV2.AnnounceUsedSpaceResponseBody)
|
||||
|
||||
resp := new(containerV2.AnnounceUsedSpaceResponse)
|
||||
resp.SetBody(respBody)
|
||||
|
||||
c.cfg.respSvc.SetMeta(resp)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
var errNodeOutsideContainer = errors.New("node outside the container")
|
||||
|
||||
type containerOnlyKeyRemoteServerInfo struct {
|
||||
key []byte
|
||||
}
|
||||
|
||||
func (i *containerOnlyKeyRemoteServerInfo) PublicKey() []byte {
|
||||
return i.key
|
||||
}
|
||||
|
||||
func (*containerOnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
|
||||
}
|
||||
|
||||
func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (*containerOnlyKeyRemoteServerInfo) ExternalAddresses() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, key []byte) (bool, error) {
|
||||
cnrNodes, _, err := l.buildPlacement(epoch, cnr)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for i := range cnrNodes {
|
||||
for j := range cnrNodes[i] {
|
||||
if bytes.Equal(cnrNodes[i][j].PublicKey(), key) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) processLoadValue(_ context.Context, a containerSDK.SizeEstimation,
|
||||
route []loadcontroller.ServerInfo, w loadcontroller.Writer) error {
|
||||
fromCnr, err := c.loadPlacementBuilder.isNodeFromContainerKey(a.Epoch(), a.Container(), route[0].PublicKey())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not verify that the sender belongs to the container: %w", err)
|
||||
} else if !fromCnr {
|
||||
return errNodeOutsideContainer
|
||||
}
|
||||
|
||||
err = loadroute.CheckRoute(c.routeBuilder, a, route)
|
||||
if err != nil {
|
||||
return fmt.Errorf("wrong route of container's used space value: %w", err)
|
||||
}
|
||||
|
||||
err = w.Put(a)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not write container's used space value: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// implements interface required by container service provided by morph executor.
|
||||
type morphContainerReader struct {
|
||||
eacl containerCore.EACLSource
|
||||
|
|
|
@ -179,15 +179,8 @@ func addNewEpochNotificationHandlers(c *cfg) {
|
|||
return
|
||||
}
|
||||
|
||||
n := ev.(netmapEvent.NewEpoch).EpochNumber()
|
||||
|
||||
const reBootstrapInterval = 2
|
||||
|
||||
if (n-c.cfgNetmap.startEpoch)%reBootstrapInterval == 0 {
|
||||
err := c.bootstrap()
|
||||
if err != nil {
|
||||
c.log.Warn(logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
|
||||
}
|
||||
if err := c.bootstrap(); err != nil {
|
||||
c.log.Warn(logs.FrostFSNodeCantSendRebootstrapTx, zap.Error(err))
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -227,10 +220,6 @@ func bootstrapNode(c *cfg) {
|
|||
c.log.Info(logs.FrostFSNodeNodeIsUnderMaintenanceSkipInitialBootstrap)
|
||||
return
|
||||
}
|
||||
if c.alreadyBootstraped {
|
||||
c.log.Info(logs.NetmapNodeAlreadyInCandidateListOnlineSkipInitialBootstrap)
|
||||
return
|
||||
}
|
||||
err := c.bootstrap()
|
||||
fatalOnErrDetails("bootstrap error", err)
|
||||
}
|
||||
|
@ -263,7 +252,7 @@ func initNetmapState(c *cfg) {
|
|||
fatalOnErrDetails("could not initialize current epoch number", err)
|
||||
|
||||
var ni *netmapSDK.NodeInfo
|
||||
ni, c.alreadyBootstraped, err = c.netmapInitLocalNodeState(epoch)
|
||||
ni, err = c.netmapInitLocalNodeState(epoch)
|
||||
fatalOnErrDetails("could not init network state", err)
|
||||
|
||||
stateWord := nodeState(ni)
|
||||
|
@ -282,13 +271,6 @@ func initNetmapState(c *cfg) {
|
|||
c.handleLocalNodeInfo(ni)
|
||||
}
|
||||
|
||||
func sameNodeInfo(a, b *netmapSDK.NodeInfo) bool {
|
||||
// Suboptimal, but we do this once on the node startup.
|
||||
rawA := a.Marshal()
|
||||
rawB := b.Marshal()
|
||||
return bytes.Equal(rawA, rawB)
|
||||
}
|
||||
|
||||
func nodeState(ni *netmapSDK.NodeInfo) string {
|
||||
if ni != nil {
|
||||
switch {
|
||||
|
@ -303,29 +285,27 @@ func nodeState(ni *netmapSDK.NodeInfo) string {
|
|||
return "undefined"
|
||||
}
|
||||
|
||||
func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool, error) {
|
||||
func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
|
||||
nmNodes, err := c.cfgNetmap.wrapper.GetCandidates()
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var candidate *netmapSDK.NodeInfo
|
||||
alreadyBootstraped := false
|
||||
for i := range nmNodes {
|
||||
if bytes.Equal(nmNodes[i].PublicKey(), c.binPublicKey) {
|
||||
candidate = &nmNodes[i]
|
||||
alreadyBootstraped = candidate.IsOnline() && sameNodeInfo(&c.cfgNodeInfo.localInfo, candidate)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
node, err := c.netmapLocalNodeState(epoch)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if candidate == nil {
|
||||
return node, false, nil
|
||||
return node, nil
|
||||
}
|
||||
|
||||
nmState := nodeState(node)
|
||||
|
@ -337,7 +317,7 @@ func (c *cfg) netmapInitLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, bool,
|
|||
zap.String("netmap", nmState),
|
||||
zap.String("candidate", candidateState))
|
||||
}
|
||||
return candidate, alreadyBootstraped, nil
|
||||
return candidate, nil
|
||||
}
|
||||
|
||||
func (c *cfg) netmapLocalNodeState(epoch uint64) (*netmapSDK.NodeInfo, error) {
|
||||
|
|
2
go.mod
2
go.mod
|
@ -6,7 +6,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230928142024-84b9d29fc98c
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
github.com/cheggaaa/pb v1.0.29
|
||||
|
|
4
go.sum
4
go.sum
|
@ -393,8 +393,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c h1:gD+dj5IZx9jlniDu8TlLQdRGCd8KIOzYjjdDd1KcVdI=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230911122224-ac8fc6d4400c/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230928142024-84b9d29fc98c h1:c8mduKlc8Zioppz5o06QRYS5KYX3BFRO+NgKj2q6kD8=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230928142024-84b9d29fc98c/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/nep17"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||
sc "github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||
|
@ -157,6 +158,8 @@ func (e *notHaltStateError) Error() string {
|
|||
)
|
||||
}
|
||||
|
||||
var errEmptyInvocationScript = errors.New("got empty invocation script from neo node")
|
||||
|
||||
// implementation of error interface for FrostFS-specific errors.
|
||||
type frostfsError struct {
|
||||
err error
|
||||
|
@ -470,6 +473,64 @@ func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
|
|||
return c.rolemgmt.GetDesignatedByRole(r, height)
|
||||
}
|
||||
|
||||
// tries to resolve sc.Parameter from the arg.
|
||||
//
|
||||
// Wraps any error to frostfsError.
|
||||
func toStackParameter(value any) (sc.Parameter, error) {
|
||||
var res = sc.Parameter{
|
||||
Value: value,
|
||||
}
|
||||
|
||||
switch v := value.(type) {
|
||||
case []byte:
|
||||
res.Type = sc.ByteArrayType
|
||||
case int:
|
||||
res.Type = sc.IntegerType
|
||||
res.Value = big.NewInt(int64(v))
|
||||
case int64:
|
||||
res.Type = sc.IntegerType
|
||||
res.Value = big.NewInt(v)
|
||||
case uint64:
|
||||
res.Type = sc.IntegerType
|
||||
res.Value = new(big.Int).SetUint64(v)
|
||||
case [][]byte:
|
||||
arr := make([]sc.Parameter, 0, len(v))
|
||||
for i := range v {
|
||||
elem, err := toStackParameter(v[i])
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
arr = append(arr, elem)
|
||||
}
|
||||
|
||||
res.Type = sc.ArrayType
|
||||
res.Value = arr
|
||||
case string:
|
||||
res.Type = sc.StringType
|
||||
case util.Uint160:
|
||||
res.Type = sc.ByteArrayType
|
||||
res.Value = v.BytesBE()
|
||||
case noderoles.Role:
|
||||
res.Type = sc.IntegerType
|
||||
res.Value = big.NewInt(int64(v))
|
||||
case keys.PublicKeys:
|
||||
arr := make([][]byte, 0, len(v))
|
||||
for i := range v {
|
||||
arr = append(arr, v[i].Bytes())
|
||||
}
|
||||
|
||||
return toStackParameter(arr)
|
||||
case bool:
|
||||
res.Type = sc.BoolType
|
||||
res.Value = v
|
||||
default:
|
||||
return res, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// MagicNumber returns the magic number of the network
|
||||
// to which the underlying RPC node client is connected.
|
||||
func (c *Client) MagicNumber() (uint64, error) {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"crypto/elliptic"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -20,20 +19,19 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/notary"
|
||||
sc "github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
notaryInfo struct {
|
||||
txValidTime uint32 // minimum amount of blocks when mainTx will be valid
|
||||
roundTime uint32 // extra amount of blocks to synchronize sidechain height diff of inner ring nodes
|
||||
txValidTime uint32 // minimum amount of blocks when mainTx will be valid
|
||||
roundTime uint32 // extra amount of blocks to synchronize sidechain height diff of inner ring nodes
|
||||
fallbackTime uint32 // mainTx's ValidUntilBlock - fallbackTime + 1 is when fallbackTx is sent
|
||||
|
||||
alphabetSource AlphabetKeys // source of alphabet node keys to prepare witness
|
||||
|
||||
|
@ -44,7 +42,7 @@ type (
|
|||
notaryCfg struct {
|
||||
proxy util.Uint160
|
||||
|
||||
txValidTime, roundTime uint32
|
||||
txValidTime, roundTime, fallbackTime uint32
|
||||
|
||||
alphabetSource AlphabetKeys
|
||||
}
|
||||
|
@ -54,8 +52,9 @@ type (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultNotaryValidTime = 50
|
||||
defaultNotaryRoundTime = 100
|
||||
defaultNotaryValidTime = 50
|
||||
defaultNotaryRoundTime = 100
|
||||
defaultNotaryFallbackTime = 40
|
||||
|
||||
notaryBalanceOfMethod = "balanceOf"
|
||||
notaryExpirationOfMethod = "expirationOf"
|
||||
|
@ -71,6 +70,7 @@ func defaultNotaryConfig(c *Client) *notaryCfg {
|
|||
return ¬aryCfg{
|
||||
txValidTime: defaultNotaryValidTime,
|
||||
roundTime: defaultNotaryRoundTime,
|
||||
fallbackTime: defaultNotaryFallbackTime,
|
||||
alphabetSource: c.Committee,
|
||||
}
|
||||
}
|
||||
|
@ -105,6 +105,7 @@ func (c *Client) EnableNotarySupport(opts ...NotaryOption) error {
|
|||
proxy: cfg.proxy,
|
||||
txValidTime: cfg.txValidTime,
|
||||
roundTime: cfg.roundTime,
|
||||
fallbackTime: cfg.fallbackTime,
|
||||
alphabetSource: cfg.alphabetSource,
|
||||
notary: notary.Hash,
|
||||
}
|
||||
|
@ -408,32 +409,33 @@ func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error {
|
|||
return fmt.Errorf("could not fetch current alphabet keys: %w", err)
|
||||
}
|
||||
|
||||
cosigners, err := c.notaryCosignersFromTx(mainTx, alphabetList)
|
||||
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, false, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nAct, err := notary.NewActor(c.client, cosigners, c.acc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// mainTX is expected to be pre-validated: second witness must exist and be empty
|
||||
mainTx.Scripts[1].VerificationScript = multiaddrAccount.GetVerificationScript()
|
||||
mainTx.Scripts[1].InvocationScript = append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
multiaddrAccount.SignHashable(c.rpcActor.GetNetwork(), mainTx)...,
|
||||
)
|
||||
|
||||
// Sign exactly the same transaction we've got from the received Notary request.
|
||||
err = nAct.Sign(mainTx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("faield to sign notary request: %w", err)
|
||||
}
|
||||
|
||||
mainH, fbH, untilActual, err := nAct.Notarize(mainTx, nil)
|
||||
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
|
||||
resp, err := c.client.SignAndPushP2PNotaryRequest(mainTx,
|
||||
[]byte{byte(opcode.RET)},
|
||||
-1,
|
||||
0,
|
||||
c.notary.fallbackTime,
|
||||
c.acc)
|
||||
|
||||
if err != nil && !alreadyOnChainError(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Debug(logs.ClientNotaryRequestWithPreparedMainTXInvoked,
|
||||
zap.String("tx_hash", mainH.StringLE()),
|
||||
zap.Uint32("valid_until_block", untilActual),
|
||||
zap.String("fallback_hash", fbH.StringLE()))
|
||||
zap.Uint32("fallback_valid_for", c.notary.fallbackTime),
|
||||
zap.Stringer("tx_hash", resp.Hash().Reverse()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -444,159 +446,75 @@ func (c *Client) notaryInvokeAsCommittee(method string, nonce, vub uint32, args
|
|||
}
|
||||
|
||||
func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint160, nonce uint32, vub *uint32, method string, args ...any) error {
|
||||
start := time.Now()
|
||||
success := false
|
||||
defer func() {
|
||||
c.metrics.ObserveInvoke("notaryInvoke", contract.String(), method, success, time.Since(start))
|
||||
}()
|
||||
|
||||
alphabetList, err := c.notary.alphabetSource()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
until, err := c.getUntilValue(vub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cosigners, err := c.notaryCosigners(invokedByAlpha, alphabetList, committee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nAct, err := notary.NewActor(c.client, cosigners, c.acc)
|
||||
params, err := invocationParams(args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mainH, fbH, untilActual, err := nAct.Notarize(nAct.MakeTunedCall(contract, method, nil, func(r *result.Invoke, t *transaction.Transaction) error {
|
||||
if r.State != vmstate.Halt.String() {
|
||||
return wrapFrostFSError(¬HaltStateError{state: r.State, exception: r.FaultException})
|
||||
}
|
||||
test, err := c.makeTestInvocation(contract, method, params, cosigners)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.ValidUntilBlock = until
|
||||
t.Nonce = nonce
|
||||
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, committee, invokedByAlpha)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}, args...))
|
||||
until, err := c.getUntilValue(vub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mainTx, err := c.buildMainTx(invokedByAlpha, nonce, alphabetList, test, cosigners, multiaddrAccount, until)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
|
||||
resp, err := c.client.SignAndPushP2PNotaryRequest(mainTx,
|
||||
[]byte{byte(opcode.RET)},
|
||||
-1,
|
||||
0,
|
||||
c.notary.fallbackTime,
|
||||
c.acc)
|
||||
if err != nil && !alreadyOnChainError(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Debug(logs.ClientNotaryRequestInvoked,
|
||||
zap.String("method", method),
|
||||
zap.Uint32("valid_until_block", untilActual),
|
||||
zap.String("tx_hash", mainH.StringLE()),
|
||||
zap.String("fallback_hash", fbH.StringLE()))
|
||||
zap.Uint32("valid_until_block", until),
|
||||
zap.Uint32("fallback_valid_for", c.notary.fallbackTime),
|
||||
zap.Stringer("tx_hash", resp.Hash().Reverse()))
|
||||
|
||||
success = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) notaryCosignersFromTx(mainTx *transaction.Transaction, alphabetList keys.PublicKeys) ([]actor.SignerAccount, error) {
|
||||
multiaddrAccount, err := c.notaryMultisigAccount(alphabetList, false, true)
|
||||
func (c *Client) makeTestInvocation(contract util.Uint160, method string, params []sc.Parameter, cosigners []transaction.Signer) (*result.Invoke, error) {
|
||||
test, err := c.client.InvokeFunction(contract, method, params, cosigners)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Here we need to add a committee signature (second witness) to the pre-validated
|
||||
// main transaction without creating a new one. However, Notary actor demands the
|
||||
// proper set of signers for constructor, thus, fill it from the main transaction's signers list.
|
||||
s := make([]actor.SignerAccount, 2, 3)
|
||||
s[0] = actor.SignerAccount{
|
||||
// Proxy contract that will pay for the execution.
|
||||
Signer: mainTx.Signers[0],
|
||||
Account: notary.FakeContractAccount(mainTx.Signers[0].Account),
|
||||
}
|
||||
s[1] = actor.SignerAccount{
|
||||
// Inner ring multisignature.
|
||||
Signer: mainTx.Signers[1],
|
||||
Account: multiaddrAccount,
|
||||
}
|
||||
if len(mainTx.Signers) > 3 {
|
||||
// Invoker signature (simple signature account of storage node is expected).
|
||||
var acc *wallet.Account
|
||||
script := mainTx.Scripts[2].VerificationScript
|
||||
if len(script) == 0 {
|
||||
acc = notary.FakeContractAccount(mainTx.Signers[2].Account)
|
||||
} else {
|
||||
pubBytes, ok := vm.ParseSignatureContract(script)
|
||||
if ok {
|
||||
pub, err := keys.NewPublicKeyFromBytes(pubBytes, elliptic.P256())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key: %w", err)
|
||||
}
|
||||
acc = notary.FakeSimpleAccount(pub)
|
||||
} else {
|
||||
m, pubsBytes, ok := vm.ParseMultiSigContract(script)
|
||||
if !ok {
|
||||
return nil, errors.New("failed to parse verification script of signer #2: unknown witness type")
|
||||
}
|
||||
pubs := make(keys.PublicKeys, len(pubsBytes))
|
||||
for i := range pubs {
|
||||
pubs[i], err = keys.NewPublicKeyFromBytes(pubsBytes[i], elliptic.P256())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key #%d: %w", i, err)
|
||||
}
|
||||
}
|
||||
acc, err = notary.FakeMultisigAccount(m, pubs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create fake account for signer #2: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
s = append(s, actor.SignerAccount{
|
||||
Signer: mainTx.Signers[2],
|
||||
Account: acc,
|
||||
})
|
||||
if test.State != HaltState {
|
||||
return nil, wrapFrostFSError(¬HaltStateError{state: test.State, exception: test.FaultException})
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, committee bool) ([]actor.SignerAccount, error) {
|
||||
multiaddrAccount, err := c.notaryMultisigAccount(ir, committee, invokedByAlpha)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if len(test.Script) == 0 {
|
||||
return nil, wrapFrostFSError(errEmptyInvocationScript)
|
||||
}
|
||||
s := make([]actor.SignerAccount, 2, 3)
|
||||
// Proxy contract that will pay for the execution.
|
||||
s[0] = actor.SignerAccount{
|
||||
Signer: transaction.Signer{
|
||||
Account: c.notary.proxy,
|
||||
Scopes: transaction.None,
|
||||
},
|
||||
Account: notary.FakeContractAccount(c.notary.proxy),
|
||||
}
|
||||
// Inner ring multisignature.
|
||||
s[1] = actor.SignerAccount{
|
||||
Signer: transaction.Signer{
|
||||
Account: multiaddrAccount.ScriptHash(),
|
||||
Scopes: c.cfg.signer.Scopes,
|
||||
AllowedContracts: c.cfg.signer.AllowedContracts,
|
||||
AllowedGroups: c.cfg.signer.AllowedGroups,
|
||||
},
|
||||
Account: multiaddrAccount,
|
||||
}
|
||||
|
||||
if !invokedByAlpha {
|
||||
// Invoker signature.
|
||||
s = append(s, actor.SignerAccount{
|
||||
Signer: transaction.Signer{
|
||||
Account: hash.Hash160(c.acc.GetVerificationScript()),
|
||||
Scopes: c.cfg.signer.Scopes,
|
||||
AllowedContracts: c.cfg.signer.AllowedContracts,
|
||||
AllowedGroups: c.cfg.signer.AllowedGroups,
|
||||
},
|
||||
Account: c.acc,
|
||||
})
|
||||
}
|
||||
|
||||
// The last one is Notary contract that will be added to the signers list
|
||||
// by Notary actor automatically.
|
||||
return s, nil
|
||||
return test, nil
|
||||
}
|
||||
|
||||
func (c *Client) getUntilValue(vub *uint32) (uint32, error) {
|
||||
|
@ -606,6 +524,195 @@ func (c *Client) getUntilValue(vub *uint32) (uint32, error) {
|
|||
return c.notaryTxValidationLimit()
|
||||
}
|
||||
|
||||
func (c *Client) buildMainTx(invokedByAlpha bool, nonce uint32, alphabetList keys.PublicKeys, test *result.Invoke,
|
||||
cosigners []transaction.Signer, multiaddrAccount *wallet.Account, until uint32) (*transaction.Transaction, error) {
|
||||
// after test invocation we build main multisig transaction
|
||||
|
||||
u8n := uint8(len(alphabetList))
|
||||
|
||||
if !invokedByAlpha {
|
||||
u8n++
|
||||
}
|
||||
|
||||
// prepare main tx
|
||||
mainTx := &transaction.Transaction{
|
||||
Nonce: nonce,
|
||||
SystemFee: test.GasConsumed,
|
||||
ValidUntilBlock: until,
|
||||
Script: test.Script,
|
||||
Attributes: []transaction.Attribute{
|
||||
{
|
||||
Type: transaction.NotaryAssistedT,
|
||||
Value: &transaction.NotaryAssisted{NKeys: u8n},
|
||||
},
|
||||
},
|
||||
Signers: cosigners,
|
||||
}
|
||||
|
||||
// calculate notary fee
|
||||
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
|
||||
notaryFee, err := c.client.CalculateNotaryFee(u8n)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// add network fee for cosigners
|
||||
//nolint:staticcheck // waits for neo-go v0.99.3 with notary actors
|
||||
//lint:ignore SA1019 https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/202
|
||||
err = c.client.AddNetworkFee(
|
||||
mainTx,
|
||||
notaryFee,
|
||||
c.notaryAccounts(invokedByAlpha, multiaddrAccount)...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// define witnesses
|
||||
mainTx.Scripts = c.notaryWitnesses(invokedByAlpha, multiaddrAccount, mainTx)
|
||||
|
||||
return mainTx, nil
|
||||
}
|
||||
|
||||
func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, committee bool) ([]transaction.Signer, error) {
|
||||
s := make([]transaction.Signer, 0, 4)
|
||||
|
||||
// first we have proxy contract signature, as it will pay for the execution
|
||||
s = append(s, transaction.Signer{
|
||||
Account: c.notary.proxy,
|
||||
Scopes: transaction.None,
|
||||
})
|
||||
|
||||
// then we have inner ring multiaddress signature
|
||||
m := sigCount(ir, committee)
|
||||
|
||||
multisigScript, err := sc.CreateMultiSigRedeemScript(m, ir)
|
||||
if err != nil {
|
||||
// wrap error as FrostFS-specific since the call is not related to any client
|
||||
return nil, wrapFrostFSError(fmt.Errorf("can't create ir multisig redeem script: %w", err))
|
||||
}
|
||||
|
||||
s = append(s, transaction.Signer{
|
||||
Account: hash.Hash160(multisigScript),
|
||||
Scopes: c.cfg.signer.Scopes,
|
||||
AllowedContracts: c.cfg.signer.AllowedContracts,
|
||||
AllowedGroups: c.cfg.signer.AllowedGroups,
|
||||
})
|
||||
|
||||
if !invokedByAlpha {
|
||||
// then we have invoker signature
|
||||
s = append(s, transaction.Signer{
|
||||
Account: hash.Hash160(c.acc.GetVerificationScript()),
|
||||
Scopes: c.cfg.signer.Scopes,
|
||||
AllowedContracts: c.cfg.signer.AllowedContracts,
|
||||
AllowedGroups: c.cfg.signer.AllowedGroups,
|
||||
})
|
||||
}
|
||||
|
||||
// last one is a placeholder for notary contract signature
|
||||
s = append(s, transaction.Signer{
|
||||
Account: c.notary.notary,
|
||||
Scopes: transaction.None,
|
||||
})
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (c *Client) notaryAccounts(invokedByAlpha bool, multiaddr *wallet.Account) []*wallet.Account {
|
||||
if multiaddr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
a := make([]*wallet.Account, 0, 4)
|
||||
|
||||
// first we have proxy account, as it will pay for the execution
|
||||
a = append(a, notary.FakeContractAccount(c.notary.proxy))
|
||||
|
||||
// then we have inner ring multiaddress account
|
||||
a = append(a, multiaddr)
|
||||
|
||||
if !invokedByAlpha {
|
||||
// then we have invoker account
|
||||
a = append(a, c.acc)
|
||||
}
|
||||
|
||||
// last one is a placeholder for notary contract account
|
||||
a = append(a, &wallet.Account{
|
||||
Contract: &wallet.Contract{},
|
||||
})
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
func (c *Client) notaryWitnesses(invokedByAlpha bool, multiaddr *wallet.Account, tx *transaction.Transaction) []transaction.Witness {
|
||||
if multiaddr == nil || tx == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
w := make([]transaction.Witness, 0, 4)
|
||||
|
||||
// first we have empty proxy witness, because notary will execute `Verify`
|
||||
// method on the proxy contract to check witness
|
||||
w = append(w, transaction.Witness{
|
||||
InvocationScript: []byte{},
|
||||
VerificationScript: []byte{},
|
||||
})
|
||||
|
||||
// then we have inner ring multiaddress witness
|
||||
|
||||
// invocation script should be of the form:
|
||||
// { PUSHDATA1, 64, signatureBytes... }
|
||||
// to pass Notary module verification
|
||||
var invokeScript []byte
|
||||
|
||||
magicNumber := c.rpcActor.GetNetwork()
|
||||
|
||||
if invokedByAlpha {
|
||||
invokeScript = append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
multiaddr.SignHashable(magicNumber, tx)...,
|
||||
)
|
||||
} else {
|
||||
// we can't provide alphabet node signature
|
||||
// because Storage Node doesn't own alphabet's
|
||||
// private key. Thus, add dummy witness with
|
||||
// empty bytes instead of signature
|
||||
invokeScript = append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
make([]byte, 64)...,
|
||||
)
|
||||
}
|
||||
|
||||
w = append(w, transaction.Witness{
|
||||
InvocationScript: invokeScript,
|
||||
VerificationScript: multiaddr.GetVerificationScript(),
|
||||
})
|
||||
|
||||
if !invokedByAlpha {
|
||||
// then we have invoker witness
|
||||
invokeScript = append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
c.acc.SignHashable(magicNumber, tx)...,
|
||||
)
|
||||
|
||||
w = append(w, transaction.Witness{
|
||||
InvocationScript: invokeScript,
|
||||
VerificationScript: c.acc.GetVerificationScript(),
|
||||
})
|
||||
}
|
||||
|
||||
// last one is a placeholder for notary contract witness
|
||||
w = append(w, transaction.Witness{
|
||||
InvocationScript: append(
|
||||
[]byte{byte(opcode.PUSHDATA1), 64},
|
||||
make([]byte, 64)...,
|
||||
),
|
||||
VerificationScript: []byte{},
|
||||
})
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedByAlpha bool) (*wallet.Account, error) {
|
||||
m := sigCount(ir, committee)
|
||||
|
||||
|
@ -662,6 +769,21 @@ func (c *Client) depositExpirationOf() (int64, error) {
|
|||
return currentTillBig.Int64(), nil
|
||||
}
|
||||
|
||||
func invocationParams(args ...any) ([]sc.Parameter, error) {
|
||||
params := make([]sc.Parameter, 0, len(args))
|
||||
|
||||
for i := range args {
|
||||
param, err := toStackParameter(args[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
params = append(params, param)
|
||||
}
|
||||
|
||||
return params, nil
|
||||
}
|
||||
|
||||
// sigCount returns the number of required signature.
|
||||
// For FrostFS Alphabet M is a 2/3+1 of it (like in dBFT).
|
||||
// If committee is true, returns M as N/2+1.
|
||||
|
@ -699,6 +821,15 @@ func WithAlphabetSource(t AlphabetKeys) NotaryOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithFallbackTime returns a notary support option for client
|
||||
// that specifies amount of blocks before fallbackTx will be sent.
|
||||
// Should be less than TxValidTime.
|
||||
func WithFallbackTime(t uint32) NotaryOption {
|
||||
return func(c *notaryCfg) {
|
||||
c.fallbackTime = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithProxyContract sets proxy contract hash.
|
||||
func WithProxyContract(h util.Uint160) NotaryOption {
|
||||
return func(c *notaryCfg) {
|
||||
|
|
|
@ -192,15 +192,15 @@ func (p Preparator) validateNotaryRequest(nr *payload.P2PNotaryRequest) error {
|
|||
}
|
||||
invokerWitness := ln == 4
|
||||
|
||||
// alphabet node should handle only notary requests that do not yet have inner
|
||||
// ring multisignature filled => such main TXs either have empty invocation script
|
||||
// of the inner ring witness (in case if Notary Actor is used to create request)
|
||||
// or have it filled with dummy bytes (if request was created manually with the old
|
||||
// neo-go API)
|
||||
multiInvScript := nr.MainTransaction.Scripts[1].InvocationScript
|
||||
|
||||
// alphabet node should handle only notary requests
|
||||
// that have been sent unsigned (by storage nodes) =>
|
||||
// such main TXs should have either a dummy or an
|
||||
// empty script as an invocation script
|
||||
//
|
||||
// this check prevents notary flow recursion
|
||||
if !(len(nr.MainTransaction.Scripts[1].InvocationScript) == 0 ||
|
||||
bytes.Equal(nr.MainTransaction.Scripts[1].InvocationScript, p.dummyInvocationScript)) { // compatibility with old version
|
||||
if len(multiInvScript) > 0 && !bytes.Equal(nr.MainTransaction.Scripts[1].InvocationScript, p.dummyInvocationScript) {
|
||||
return ErrTXAlreadyHandled
|
||||
}
|
||||
|
||||
|
@ -227,7 +227,12 @@ func (p Preparator) validateNotaryRequest(nr *payload.P2PNotaryRequest) error {
|
|||
}
|
||||
|
||||
// validate main TX expiration
|
||||
return p.validateExpiration(nr.FallbackTransaction)
|
||||
err = p.validateExpiration(nr.FallbackTransaction)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p Preparator) validateParameterOpcodes(ops []Op) error {
|
||||
|
|
|
@ -1,307 +0,0 @@
|
|||
package loadcontroller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// StartPrm groups the required parameters of the Controller.Start method.
|
||||
type StartPrm struct {
|
||||
// Epoch number by which you want to select
|
||||
// the values of the used space of containers.
|
||||
Epoch uint64
|
||||
}
|
||||
|
||||
type commonContext struct {
|
||||
epoch uint64
|
||||
|
||||
ctrl *Controller
|
||||
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
type announcer struct {
|
||||
commonContext
|
||||
}
|
||||
|
||||
// Start starts the processing of container.SizeEstimation values.
|
||||
//
|
||||
// Single Start operation overtakes all data from LocalMetrics to
|
||||
// LocalAnnouncementTarget (Controller's parameters).
|
||||
// No filter by epoch is used for the iterator, since it is expected
|
||||
// that the source of metrics does not track the change of epochs.
|
||||
//
|
||||
// Each call acquires an announcement context for an Epoch parameter.
|
||||
// At the very end of the operation, the context is released.
|
||||
func (c *Controller) Start(ctx context.Context, prm StartPrm) {
|
||||
var announcer *announcer
|
||||
// acquire announcement
|
||||
ctx, announcer = c.acquireAnnouncement(ctx, prm)
|
||||
if announcer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// finally stop and free the announcement
|
||||
defer announcer.freeAnnouncement()
|
||||
|
||||
// announce local values
|
||||
announcer.announce(ctx)
|
||||
}
|
||||
|
||||
func (c *announcer) announce(ctx context.Context) {
|
||||
c.log.Debug(logs.ControllerStartingToAnnounceTheValuesOfTheMetrics)
|
||||
|
||||
var (
|
||||
metricsIterator Iterator
|
||||
err error
|
||||
)
|
||||
|
||||
// initialize iterator over locally collected metrics
|
||||
metricsIterator, err = c.ctrl.prm.LocalMetrics.InitIterator()
|
||||
if err != nil {
|
||||
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyCollectedMetrics,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// initialize target of local announcements
|
||||
targetWriter, err := c.ctrl.prm.LocalAnnouncementTarget.InitWriter(nil)
|
||||
if err != nil {
|
||||
c.log.Debug(logs.ControllerCouldNotInitializeAnnouncementAccumulator,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// iterate over all collected metrics and write them to the target
|
||||
err = metricsIterator.Iterate(
|
||||
func(container.SizeEstimation) bool {
|
||||
return true // local metrics don't know about epochs
|
||||
},
|
||||
func(a container.SizeEstimation) error {
|
||||
a.SetEpoch(c.epoch) // set epoch explicitly
|
||||
return targetWriter.Put(a)
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
c.log.Debug(logs.ControllerIteratorOverLocallyCollectedMetricsAborted,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// finish writing
|
||||
err = targetWriter.Close(ctx)
|
||||
if err != nil {
|
||||
c.log.Debug(logs.ControllerCouldNotFinishWritingLocalAnnouncements,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
c.log.Debug(logs.ControllerTrustAnnouncementSuccessfullyFinished)
|
||||
}
|
||||
|
||||
func (c *Controller) acquireAnnouncement(ctx context.Context, prm StartPrm) (context.Context, *announcer) {
|
||||
started := true
|
||||
c.announceMtx.Lock()
|
||||
{
|
||||
if cancel := c.mAnnounceCtx[prm.Epoch]; cancel == nil {
|
||||
ctx, cancel = context.WithCancel(ctx)
|
||||
c.mAnnounceCtx[prm.Epoch] = cancel
|
||||
started = false
|
||||
}
|
||||
}
|
||||
c.announceMtx.Unlock()
|
||||
|
||||
log := &logger.Logger{Logger: c.opts.log.With(
|
||||
zap.Uint64("epoch", prm.Epoch),
|
||||
)}
|
||||
|
||||
if started {
|
||||
log.Debug(logs.ControllerAnnouncementIsAlreadyStarted)
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
return ctx, &announcer{
|
||||
commonContext: commonContext{
|
||||
epoch: prm.Epoch,
|
||||
ctrl: c,
|
||||
log: log,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *commonContext) freeAnnouncement() {
|
||||
var stopped bool
|
||||
|
||||
c.ctrl.announceMtx.Lock()
|
||||
|
||||
{
|
||||
var cancel context.CancelFunc
|
||||
|
||||
cancel, stopped = c.ctrl.mAnnounceCtx[c.epoch]
|
||||
|
||||
if stopped {
|
||||
cancel()
|
||||
delete(c.ctrl.mAnnounceCtx, c.epoch)
|
||||
}
|
||||
}
|
||||
|
||||
c.ctrl.announceMtx.Unlock()
|
||||
|
||||
if stopped {
|
||||
c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted)
|
||||
} else {
|
||||
c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted)
|
||||
}
|
||||
}
|
||||
|
||||
// StopPrm groups the required parameters of the Controller.Stop method.
|
||||
type StopPrm struct {
|
||||
// Epoch number the analysis of the values of which must be interrupted.
|
||||
Epoch uint64
|
||||
}
|
||||
|
||||
type reporter struct {
|
||||
commonContext
|
||||
}
|
||||
|
||||
// Stop interrupts the processing of container.SizeEstimation values.
|
||||
//
|
||||
// Single Stop operation releases an announcement context and overtakes
|
||||
// all data from AnnouncementAccumulator to ResultReceiver (Controller's
|
||||
// parameters). Only values for the specified Epoch parameter are processed.
|
||||
//
|
||||
// Each call acquires a report context for an Epoch parameter.
|
||||
// At the very end of the operation, the context is released.
|
||||
func (c *Controller) Stop(ctx context.Context, prm StopPrm) {
|
||||
var reporter *reporter
|
||||
ctx, reporter = c.acquireReport(ctx, prm)
|
||||
if reporter == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// finally stop and free reporting
|
||||
defer reporter.freeReport()
|
||||
|
||||
// interrupt announcement
|
||||
reporter.freeAnnouncement()
|
||||
|
||||
// report the estimations
|
||||
reporter.report(ctx)
|
||||
}
|
||||
|
||||
func (c *Controller) acquireReport(ctx context.Context, prm StopPrm) (context.Context, *reporter) {
|
||||
started := true
|
||||
|
||||
c.reportMtx.Lock()
|
||||
{
|
||||
if cancel := c.mReportCtx[prm.Epoch]; cancel == nil {
|
||||
ctx, cancel = context.WithCancel(ctx)
|
||||
c.mReportCtx[prm.Epoch] = cancel
|
||||
started = false
|
||||
}
|
||||
}
|
||||
|
||||
c.reportMtx.Unlock()
|
||||
|
||||
log := &logger.Logger{Logger: c.opts.log.With(
|
||||
zap.Uint64("epoch", prm.Epoch),
|
||||
)}
|
||||
|
||||
if started {
|
||||
log.Debug(logs.ControllerReportIsAlreadyStarted)
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
return ctx, &reporter{
|
||||
commonContext: commonContext{
|
||||
epoch: prm.Epoch,
|
||||
ctrl: c,
|
||||
log: log,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *commonContext) freeReport() {
|
||||
var stopped bool
|
||||
|
||||
c.ctrl.reportMtx.Lock()
|
||||
|
||||
{
|
||||
var cancel context.CancelFunc
|
||||
|
||||
cancel, stopped = c.ctrl.mReportCtx[c.epoch]
|
||||
|
||||
if stopped {
|
||||
cancel()
|
||||
delete(c.ctrl.mReportCtx, c.epoch)
|
||||
}
|
||||
}
|
||||
|
||||
c.ctrl.reportMtx.Unlock()
|
||||
|
||||
if stopped {
|
||||
c.log.Debug(logs.ControllerAnnouncementSuccessfullyInterrupted)
|
||||
} else {
|
||||
c.log.Debug(logs.ControllerAnnouncementIsNotStartedOrAlreadyInterrupted)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *reporter) report(ctx context.Context) {
|
||||
var (
|
||||
localIterator Iterator
|
||||
err error
|
||||
)
|
||||
|
||||
// initialize iterator over locally accumulated announcements
|
||||
localIterator, err = c.ctrl.prm.AnnouncementAccumulator.InitIterator()
|
||||
if err != nil {
|
||||
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocallyAccumulatedAnnouncements,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// initialize final destination of load estimations
|
||||
resultWriter, err := c.ctrl.prm.ResultReceiver.InitWriter(nil)
|
||||
if err != nil {
|
||||
c.log.Debug(logs.ControllerCouldNotInitializeResultTarget,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// iterate over all accumulated announcements and write them to the target
|
||||
err = localIterator.Iterate(
|
||||
usedSpaceFilterEpochEQ(c.epoch),
|
||||
resultWriter.Put,
|
||||
)
|
||||
if err != nil {
|
||||
c.log.Debug(logs.ControllerIteratorOverLocalAnnouncementsAborted,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// finish writing
|
||||
err = resultWriter.Close(ctx)
|
||||
if err != nil {
|
||||
c.log.Debug(logs.ControllerCouldNotFinishWritingLoadEstimations,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
|
@ -1,192 +0,0 @@
|
|||
package loadcontroller_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testAnnouncementStorage struct {
|
||||
w loadcontroller.Writer
|
||||
|
||||
i loadcontroller.Iterator
|
||||
|
||||
mtx sync.RWMutex
|
||||
|
||||
m map[uint64][]container.SizeEstimation
|
||||
}
|
||||
|
||||
func newTestStorage() *testAnnouncementStorage {
|
||||
return &testAnnouncementStorage{
|
||||
m: make(map[uint64][]container.SizeEstimation),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testAnnouncementStorage) InitIterator() (loadcontroller.Iterator, error) {
|
||||
if s.i != nil {
|
||||
return s.i, nil
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *testAnnouncementStorage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
for _, v := range s.m {
|
||||
for _, a := range v {
|
||||
if f(a) {
|
||||
if err := h(a); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *testAnnouncementStorage) InitWriter([]loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
|
||||
if s.w != nil {
|
||||
return s.w, nil
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *testAnnouncementStorage) Put(v container.SizeEstimation) error {
|
||||
s.mtx.Lock()
|
||||
s.m[v.Epoch()] = append(s.m[v.Epoch()], v)
|
||||
s.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *testAnnouncementStorage) Close(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func randAnnouncement() (a container.SizeEstimation) {
|
||||
a.SetContainer(cidtest.ID())
|
||||
a.SetValue(rand.Uint64())
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestSimpleScenario(t *testing.T) {
|
||||
// create storage to write final estimations
|
||||
resultStorage := newTestStorage()
|
||||
|
||||
// create storages to accumulate announcements
|
||||
accumulatingStorageN2 := newTestStorage()
|
||||
|
||||
// create storage of local metrics
|
||||
localStorageN1 := newTestStorage()
|
||||
localStorageN2 := newTestStorage()
|
||||
|
||||
// create 2 controllers: 1st writes announcements to 2nd, 2nd directly to final destination
|
||||
ctrlN1 := loadcontroller.New(loadcontroller.Prm{
|
||||
LocalMetrics: localStorageN1,
|
||||
AnnouncementAccumulator: newTestStorage(),
|
||||
LocalAnnouncementTarget: &testAnnouncementStorage{
|
||||
w: accumulatingStorageN2,
|
||||
},
|
||||
ResultReceiver: resultStorage,
|
||||
})
|
||||
|
||||
ctrlN2 := loadcontroller.New(loadcontroller.Prm{
|
||||
LocalMetrics: localStorageN2,
|
||||
AnnouncementAccumulator: accumulatingStorageN2,
|
||||
LocalAnnouncementTarget: &testAnnouncementStorage{
|
||||
w: resultStorage,
|
||||
},
|
||||
ResultReceiver: resultStorage,
|
||||
})
|
||||
|
||||
const processEpoch uint64 = 10
|
||||
|
||||
const goodNum = 4
|
||||
|
||||
// create 2 random values for processing epoch and 1 for some different
|
||||
announces := make([]container.SizeEstimation, 0, goodNum)
|
||||
|
||||
for i := 0; i < goodNum; i++ {
|
||||
a := randAnnouncement()
|
||||
a.SetEpoch(processEpoch)
|
||||
|
||||
announces = append(announces, a)
|
||||
}
|
||||
|
||||
// store one half of "good" announcements to 1st metrics storage, another - to 2nd
|
||||
// and "bad" to both
|
||||
for i := 0; i < goodNum/2; i++ {
|
||||
require.NoError(t, localStorageN1.Put(announces[i]))
|
||||
}
|
||||
|
||||
for i := goodNum / 2; i < goodNum; i++ {
|
||||
require.NoError(t, localStorageN2.Put(announces[i]))
|
||||
}
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(2)
|
||||
|
||||
startPrm := loadcontroller.StartPrm{
|
||||
Epoch: processEpoch,
|
||||
}
|
||||
|
||||
// start both controllers
|
||||
go func() {
|
||||
ctrlN1.Start(context.Background(), startPrm)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
ctrlN2.Start(context.Background(), startPrm)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
wg.Add(2)
|
||||
|
||||
stopPrm := loadcontroller.StopPrm{
|
||||
Epoch: processEpoch,
|
||||
}
|
||||
|
||||
// stop both controllers
|
||||
go func() {
|
||||
ctrlN1.Stop(context.Background(), stopPrm)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
ctrlN2.Stop(context.Background(), stopPrm)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// result target should contain all "good" announcements and shoult not container the "bad" one
|
||||
var res []container.SizeEstimation
|
||||
|
||||
err := resultStorage.Iterate(
|
||||
func(a container.SizeEstimation) bool {
|
||||
return true
|
||||
},
|
||||
func(a container.SizeEstimation) error {
|
||||
res = append(res, a)
|
||||
return nil
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i := range announces {
|
||||
require.Contains(t, res, announces[i])
|
||||
}
|
||||
}
|
|
@ -1,94 +0,0 @@
|
|||
package loadcontroller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Prm groups the required parameters of the Controller's constructor.
|
||||
//
|
||||
// All values must comply with the requirements imposed on them.
|
||||
// Passing incorrect parameter values will result in constructor
|
||||
// failure (error or panic depending on the implementation).
|
||||
type Prm struct {
|
||||
// Iterator over the used space values of the containers
|
||||
// collected by the node locally.
|
||||
LocalMetrics IteratorProvider
|
||||
|
||||
// Place of recording the local values of
|
||||
// the used space of containers.
|
||||
LocalAnnouncementTarget WriterProvider
|
||||
|
||||
// Iterator over the summarized used space scores
|
||||
// from the various network participants.
|
||||
AnnouncementAccumulator IteratorProvider
|
||||
|
||||
// Place of recording the final estimates of
|
||||
// the used space of containers.
|
||||
ResultReceiver WriterProvider
|
||||
}
|
||||
|
||||
// Controller represents main handler for starting
|
||||
// and interrupting container volume estimation.
|
||||
//
|
||||
// It binds the interfaces of the local value stores
|
||||
// to the target storage points. Controller is abstracted
|
||||
// from the internal storage device and the network location
|
||||
// of the connecting components. At its core, it is a
|
||||
// high-level start-stop trigger for calculations.
|
||||
//
|
||||
// For correct operation, the controller must be created
|
||||
// using the constructor (New) based on the required parameters
|
||||
// and optional components. After successful creation,
|
||||
// the constructor is immediately ready to work through
|
||||
// API of external control of calculations and data transfer.
|
||||
type Controller struct {
|
||||
prm Prm
|
||||
|
||||
opts *options
|
||||
|
||||
announceMtx sync.Mutex
|
||||
mAnnounceCtx map[uint64]context.CancelFunc
|
||||
|
||||
reportMtx sync.Mutex
|
||||
mReportCtx map[uint64]context.CancelFunc
|
||||
}
|
||||
|
||||
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
||||
|
||||
func panicOnPrmValue(n string, v any) {
|
||||
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
|
||||
}
|
||||
|
||||
// New creates a new instance of the Controller.
|
||||
//
|
||||
// Panics if at least one value of the parameters is invalid.
|
||||
//
|
||||
// The created Controller does not require additional
|
||||
// initialization and is completely ready for work.
|
||||
func New(prm Prm, opts ...Option) *Controller {
|
||||
switch {
|
||||
case prm.LocalMetrics == nil:
|
||||
panicOnPrmValue("LocalMetrics", prm.LocalMetrics)
|
||||
case prm.AnnouncementAccumulator == nil:
|
||||
panicOnPrmValue("AnnouncementAccumulator", prm.AnnouncementAccumulator)
|
||||
case prm.LocalAnnouncementTarget == nil:
|
||||
panicOnPrmValue("LocalAnnouncementTarget", prm.LocalAnnouncementTarget)
|
||||
case prm.ResultReceiver == nil:
|
||||
panicOnPrmValue("ResultReceiver", prm.ResultReceiver)
|
||||
}
|
||||
|
||||
o := defaultOpts()
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
|
||||
return &Controller{
|
||||
prm: prm,
|
||||
opts: o,
|
||||
mAnnounceCtx: make(map[uint64]context.CancelFunc),
|
||||
mReportCtx: make(map[uint64]context.CancelFunc),
|
||||
}
|
||||
}
|
|
@ -1,103 +0,0 @@
|
|||
package loadcontroller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
)
|
||||
|
||||
// UsedSpaceHandler describes the signature of the container.SizeEstimation
|
||||
// value handling function.
|
||||
//
|
||||
// Termination of processing without failures is usually signaled
|
||||
// with a zero error, while a specific value may describe the reason
|
||||
// for failure.
|
||||
type UsedSpaceHandler func(container.SizeEstimation) error
|
||||
|
||||
// UsedSpaceFilter describes the signature of the function for
|
||||
// checking whether a value meets a certain criterion.
|
||||
//
|
||||
// Return of true means conformity, false - vice versa.
|
||||
type UsedSpaceFilter func(container.SizeEstimation) bool
|
||||
|
||||
// Iterator is a group of methods provided by entity
|
||||
// which can iterate over a group of container.SizeEstimation values.
|
||||
type Iterator interface {
|
||||
// Iterate must start an iterator over values that
|
||||
// meet the filter criterion (returns true).
|
||||
// For each such value should call a handler, the error
|
||||
// of which should be directly returned from the method.
|
||||
//
|
||||
// Internal failures of the iterator are also signaled via
|
||||
// an error. After a successful call to the last value
|
||||
// handler, nil should be returned.
|
||||
Iterate(UsedSpaceFilter, UsedSpaceHandler) error
|
||||
}
|
||||
|
||||
// IteratorProvider is a group of methods provided
|
||||
// by entity which generates iterators over
|
||||
// container.SizeEstimation values.
|
||||
type IteratorProvider interface {
|
||||
// InitIterator should return an initialized Iterator.
|
||||
//
|
||||
// Initialization problems are reported via error.
|
||||
// If no error was returned, then the Iterator must not be nil.
|
||||
//
|
||||
// Implementations can have different logic for different
|
||||
// contexts, so specific ones may document their own behavior.
|
||||
InitIterator() (Iterator, error)
|
||||
}
|
||||
|
||||
// Writer describes the interface for storing container.SizeEstimation values.
|
||||
//
|
||||
// This interface is provided by both local storage
|
||||
// of values and remote (wrappers over the RPC).
|
||||
type Writer interface {
|
||||
// Put performs a write operation of container.SizeEstimation value
|
||||
// and returns any error encountered.
|
||||
//
|
||||
// All values after the Close call must be flushed to the
|
||||
// physical target. Implementations can cache values before
|
||||
// Close operation.
|
||||
//
|
||||
// Put must not be called after Close.
|
||||
Put(container.SizeEstimation) error
|
||||
|
||||
// Close exits with method-providing Writer.
|
||||
//
|
||||
// All cached values must be flushed before
|
||||
// the Close's return.
|
||||
//
|
||||
// Methods must not be called after Close.
|
||||
Close(ctx context.Context) error
|
||||
}
|
||||
|
||||
// WriterProvider is a group of methods provided
|
||||
// by entity which generates keepers of
|
||||
// container.SizeEstimation values.
|
||||
type WriterProvider interface {
|
||||
// InitWriter should return an initialized Writer.
|
||||
//
|
||||
// Initialization problems are reported via error.
|
||||
// If no error was returned, then the Writer must not be nil.
|
||||
InitWriter(route []ServerInfo) (Writer, error)
|
||||
}
|
||||
|
||||
// ServerInfo describes a set of
|
||||
// characteristics of a point in a route.
|
||||
type ServerInfo interface {
|
||||
// PublicKey returns public key of the node
|
||||
// from the route in a binary representation.
|
||||
PublicKey() []byte
|
||||
|
||||
// Iterates over network addresses of the node
|
||||
// in the route. Breaks iterating on true return
|
||||
// of the handler.
|
||||
IterateAddresses(func(string) bool)
|
||||
|
||||
// Returns number of server's network addresses.
|
||||
NumberOfAddresses() int
|
||||
|
||||
// ExternalAddresses returns external node's addresses.
|
||||
ExternalAddresses() []string
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
package loadcontroller
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Option sets an optional parameter of Controller.
|
||||
type Option func(*options)
|
||||
|
||||
type options struct {
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultOpts() *options {
|
||||
return &options{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to specify logging component.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(o *options) {
|
||||
if l != nil {
|
||||
o.log = l
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
package loadcontroller
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
)
|
||||
|
||||
func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter {
|
||||
return func(a container.SizeEstimation) bool {
|
||||
return a.Epoch() == epoch
|
||||
}
|
||||
}
|
||||
|
||||
type storageWrapper struct {
|
||||
w Writer
|
||||
i Iterator
|
||||
}
|
||||
|
||||
func (s storageWrapper) InitIterator() (Iterator, error) {
|
||||
return s.i, nil
|
||||
}
|
||||
|
||||
func (s storageWrapper) InitWriter([]ServerInfo) (Writer, error) {
|
||||
return s.w, nil
|
||||
}
|
||||
|
||||
func SimpleIteratorProvider(i Iterator) IteratorProvider {
|
||||
return &storageWrapper{
|
||||
i: i,
|
||||
}
|
||||
}
|
||||
|
||||
func SimpleWriterProvider(w Writer) WriterProvider {
|
||||
return &storageWrapper{
|
||||
w: w,
|
||||
}
|
||||
}
|
|
@ -1,145 +0,0 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// InitWriter initializes and returns Writer that sends each value to its next route point.
|
||||
//
|
||||
// If route is present, then it is taken into account,
|
||||
// and the value will be sent to its continuation. Otherwise, the route will be laid
|
||||
// from scratch and the value will be sent to its primary point.
|
||||
//
|
||||
// After building a list of remote points of the next leg of the route, the value is sent
|
||||
// sequentially to all of them. If any transmissions (even all) fail, an error will not
|
||||
// be returned.
|
||||
//
|
||||
// Close of the composed Writer calls Close method on each internal Writer generated in
|
||||
// runtime and never returns an error.
|
||||
//
|
||||
// Always returns nil error.
|
||||
func (r *Router) InitWriter(route []loadcontroller.ServerInfo) (loadcontroller.Writer, error) {
|
||||
if len(route) == 0 {
|
||||
route = []loadcontroller.ServerInfo{r.localSrvInfo}
|
||||
}
|
||||
|
||||
return &loadWriter{
|
||||
router: r,
|
||||
route: route,
|
||||
mRoute: make(map[routeKey]*valuesRoute),
|
||||
mServers: make(map[string]loadcontroller.Writer),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type routeKey struct {
|
||||
epoch uint64
|
||||
|
||||
cid string
|
||||
}
|
||||
|
||||
type valuesRoute struct {
|
||||
route []loadcontroller.ServerInfo
|
||||
|
||||
values []container.SizeEstimation
|
||||
}
|
||||
|
||||
type loadWriter struct {
|
||||
router *Router
|
||||
|
||||
route []loadcontroller.ServerInfo
|
||||
|
||||
routeMtx sync.RWMutex
|
||||
mRoute map[routeKey]*valuesRoute
|
||||
|
||||
mServers map[string]loadcontroller.Writer
|
||||
}
|
||||
|
||||
func (w *loadWriter) Put(a container.SizeEstimation) error {
|
||||
w.routeMtx.Lock()
|
||||
defer w.routeMtx.Unlock()
|
||||
|
||||
key := routeKey{
|
||||
epoch: a.Epoch(),
|
||||
cid: a.Container().EncodeToString(),
|
||||
}
|
||||
|
||||
routeValues, ok := w.mRoute[key]
|
||||
if !ok {
|
||||
route, err := w.router.routeBuilder.NextStage(a, w.route)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(route) == 0 {
|
||||
route = []loadcontroller.ServerInfo{nil}
|
||||
}
|
||||
|
||||
routeValues = &valuesRoute{
|
||||
route: route,
|
||||
values: []container.SizeEstimation{a},
|
||||
}
|
||||
|
||||
w.mRoute[key] = routeValues
|
||||
}
|
||||
|
||||
for _, remoteInfo := range routeValues.route {
|
||||
var key string
|
||||
|
||||
if remoteInfo != nil {
|
||||
key = hex.EncodeToString(remoteInfo.PublicKey())
|
||||
}
|
||||
|
||||
remoteWriter, ok := w.mServers[key]
|
||||
if !ok {
|
||||
provider, err := w.router.remoteProvider.InitRemote(remoteInfo)
|
||||
if err != nil {
|
||||
w.router.log.Debug(logs.RouteCouldNotInitializeWriterProvider,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
continue // best effort
|
||||
}
|
||||
|
||||
remoteWriter, err = provider.InitWriter(w.route)
|
||||
if err != nil {
|
||||
w.router.log.Debug(logs.RouteCouldNotInitializeWriter,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
continue // best effort
|
||||
}
|
||||
|
||||
w.mServers[key] = remoteWriter
|
||||
}
|
||||
|
||||
err := remoteWriter.Put(a)
|
||||
if err != nil {
|
||||
w.router.log.Debug(logs.RouteCouldNotPutTheValue,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
// continue best effort
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *loadWriter) Close(ctx context.Context) error {
|
||||
for key, wRemote := range w.mServers {
|
||||
err := wRemote.Close(ctx)
|
||||
if err != nil {
|
||||
w.router.log.Debug(logs.RouteCouldNotCloseRemoteServerWriter,
|
||||
zap.String("key", key),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
)
|
||||
|
||||
// Builder groups methods to route values in the network.
|
||||
type Builder interface {
|
||||
// NextStage must return next group of route points for the value a
|
||||
// based on the passed route.
|
||||
//
|
||||
// Empty passed list means being at the starting point of the route.
|
||||
//
|
||||
// Must return empty list and no error if the endpoint of the route is reached.
|
||||
// If there are more than one point to go and the last passed point is included
|
||||
// in that list (means that point is the last point in one of the route groups),
|
||||
// returned route must contain nil point that should be interpreted as signal to,
|
||||
// among sending to other route points, save the announcement in that point.
|
||||
NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error)
|
||||
}
|
||||
|
||||
// RemoteWriterProvider describes the component
|
||||
// for sending values to a fixed route point.
|
||||
type RemoteWriterProvider interface {
|
||||
// InitRemote must return WriterProvider to the route point
|
||||
// corresponding to info.
|
||||
//
|
||||
// Nil info matches the end of the route.
|
||||
InitRemote(info loadcontroller.ServerInfo) (loadcontroller.WriterProvider, error)
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Option sets an optional parameter of Router.
|
||||
type Option func(*options)
|
||||
|
||||
type options struct {
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultOpts() *options {
|
||||
return &options{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns Option to specify logging component.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(o *options) {
|
||||
if l != nil {
|
||||
o.log = l
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,49 +0,0 @@
|
|||
package placementrouter
|
||||
|
||||
import "fmt"
|
||||
|
||||
// Prm groups the required parameters of the Builder's constructor.
|
||||
//
|
||||
// All values must comply with the requirements imposed on them.
|
||||
// Passing incorrect parameter values will result in constructor
|
||||
// failure (error or panic depending on the implementation).
|
||||
type Prm struct {
|
||||
// Calculator of the container members.
|
||||
//
|
||||
// Must not be nil.
|
||||
PlacementBuilder PlacementBuilder
|
||||
}
|
||||
|
||||
// Builder represents component that routes used container space
|
||||
// values between nodes from the container.
|
||||
//
|
||||
// For correct operation, Builder must be created using
|
||||
// the constructor (New) based on the required parameters
|
||||
// and optional components. After successful creation,
|
||||
// the Builder is immediately ready to work through API.
|
||||
type Builder struct {
|
||||
placementBuilder PlacementBuilder
|
||||
}
|
||||
|
||||
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
||||
|
||||
func panicOnPrmValue(n string, v any) {
|
||||
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
|
||||
}
|
||||
|
||||
// New creates a new instance of the Builder.
|
||||
//
|
||||
// Panics if at least one value of the parameters is invalid.
|
||||
//
|
||||
// The created Builder does not require additional
|
||||
// initialization and is completely ready for work.
|
||||
func New(prm Prm) *Builder {
|
||||
switch {
|
||||
case prm.PlacementBuilder == nil:
|
||||
panicOnPrmValue("PlacementBuilder", prm.PlacementBuilder)
|
||||
}
|
||||
|
||||
return &Builder{
|
||||
placementBuilder: prm.PlacementBuilder,
|
||||
}
|
||||
}
|
|
@ -1,47 +0,0 @@
|
|||
package placementrouter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
)
|
||||
|
||||
// NextStage composes container nodes for the container and epoch from a,
|
||||
// and returns the list of nodes with maximum weight (one from each vector).
|
||||
//
|
||||
// If passed route has more than one point, then endpoint of the route is reached.
|
||||
//
|
||||
// The traversed route is not checked, it is assumed to be correct.
|
||||
func (b *Builder) NextStage(a container.SizeEstimation, passed []loadcontroller.ServerInfo) ([]loadcontroller.ServerInfo, error) {
|
||||
if len(passed) > 1 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
cnr := a.Container()
|
||||
|
||||
placement, err := b.placementBuilder.BuildPlacement(a.Epoch(), cnr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not build placement %s: %w", cnr, err)
|
||||
}
|
||||
|
||||
res := make([]loadcontroller.ServerInfo, 0, len(placement))
|
||||
|
||||
for i := range placement {
|
||||
if len(placement[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(passed) == 1 && bytes.Equal(passed[0].PublicKey(), placement[i][0].PublicKey()) {
|
||||
// add nil element so the announcement will be saved in local memory
|
||||
res = append(res, nil)
|
||||
} else {
|
||||
// add element with remote node to send announcement to
|
||||
res = append(res, netmapcore.Node(placement[i][0]))
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
|
@ -1,14 +0,0 @@
|
|||
package placementrouter
|
||||
|
||||
import (
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
)
|
||||
|
||||
// PlacementBuilder describes interface of FrostFS placement calculator.
|
||||
type PlacementBuilder interface {
|
||||
// BuildPlacement must compose and sort (according to a specific algorithm)
|
||||
// storage nodes from the container by its identifier using network map
|
||||
// of particular epoch.
|
||||
BuildPlacement(epoch uint64, cnr cid.ID) ([][]netmap.NodeInfo, error)
|
||||
}
|
|
@ -1,87 +0,0 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
)
|
||||
|
||||
// Prm groups the required parameters of the Router's constructor.
|
||||
//
|
||||
// All values must comply with the requirements imposed on them.
|
||||
// Passing incorrect parameter values will result in constructor
|
||||
// failure (error or panic depending on the implementation).
|
||||
type Prm struct {
|
||||
// Characteristics of the local node's server.
|
||||
//
|
||||
// Must not be nil.
|
||||
LocalServerInfo loadcontroller.ServerInfo
|
||||
|
||||
// Component for sending values to a fixed route point.
|
||||
//
|
||||
// Must not be nil.
|
||||
RemoteWriterProvider RemoteWriterProvider
|
||||
|
||||
// Route planner.
|
||||
//
|
||||
// Must not be nil.
|
||||
Builder Builder
|
||||
}
|
||||
|
||||
// Router represents component responsible for routing
|
||||
// used container space values over the network.
|
||||
//
|
||||
// For each fixed pair (container ID, epoch) there is a
|
||||
// single value route on the network. Router provides the
|
||||
// interface for writing values to the next point of the route.
|
||||
//
|
||||
// For correct operation, Router must be created using
|
||||
// the constructor (New) based on the required parameters
|
||||
// and optional components. After successful creation,
|
||||
// the Router is immediately ready to work through API.
|
||||
type Router struct {
|
||||
log *logger.Logger
|
||||
|
||||
remoteProvider RemoteWriterProvider
|
||||
|
||||
routeBuilder Builder
|
||||
|
||||
localSrvInfo loadcontroller.ServerInfo
|
||||
}
|
||||
|
||||
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
||||
|
||||
func panicOnPrmValue(n string, v any) {
|
||||
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
|
||||
}
|
||||
|
||||
// New creates a new instance of the Router.
|
||||
//
|
||||
// Panics if at least one value of the parameters is invalid.
|
||||
//
|
||||
// The created Router does not require additional
|
||||
// initialization and is completely ready for work.
|
||||
func New(prm Prm, opts ...Option) *Router {
|
||||
switch {
|
||||
case prm.RemoteWriterProvider == nil:
|
||||
panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider)
|
||||
case prm.Builder == nil:
|
||||
panicOnPrmValue("Builder", prm.Builder)
|
||||
case prm.LocalServerInfo == nil:
|
||||
panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo)
|
||||
}
|
||||
|
||||
o := defaultOpts()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](o)
|
||||
}
|
||||
|
||||
return &Router{
|
||||
log: o.log,
|
||||
remoteProvider: prm.RemoteWriterProvider,
|
||||
routeBuilder: prm.Builder,
|
||||
localSrvInfo: prm.LocalServerInfo,
|
||||
}
|
||||
}
|
|
@ -1,49 +0,0 @@
|
|||
package loadroute
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
)
|
||||
|
||||
var errWrongRoute = errors.New("wrong route")
|
||||
|
||||
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
|
||||
//
|
||||
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
|
||||
func CheckRoute(builder Builder, a container.SizeEstimation, route []loadcontroller.ServerInfo) error {
|
||||
for i := 1; i < len(route); i++ {
|
||||
servers, err := builder.NextStage(a, route[:i])
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(servers) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
found := false
|
||||
|
||||
for j := range servers {
|
||||
if servers[j] == nil {
|
||||
// nil route point means that
|
||||
// (i-1)-th node in the route
|
||||
// must, among other things,
|
||||
// save the announcement to its
|
||||
// local memory
|
||||
continue
|
||||
}
|
||||
|
||||
if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
return errWrongRoute
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,151 +0,0 @@
|
|||
package loadstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
loadcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/container/announcement/load/controller"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
)
|
||||
|
||||
type usedSpaceEstimations struct {
|
||||
announcement container.SizeEstimation
|
||||
|
||||
sizes []uint64
|
||||
}
|
||||
|
||||
type storageKey struct {
|
||||
epoch uint64
|
||||
|
||||
cid string
|
||||
}
|
||||
|
||||
// Storage represents in-memory storage of
|
||||
// container.SizeEstimation values.
|
||||
//
|
||||
// The write operation has the usual behavior - to save
|
||||
// the next number of used container space for a specific epoch.
|
||||
// All values related to one key (epoch, container ID) are stored
|
||||
// as a list.
|
||||
//
|
||||
// Storage also provides an iterator interface, into the handler
|
||||
// of which the final score is passed, built on all values saved
|
||||
// at the time of the call. Currently the only possible estimation
|
||||
// formula is used - the average between 10th and 90th percentile.
|
||||
//
|
||||
// For correct operation, Storage must be created
|
||||
// using the constructor (New) based on the required parameters
|
||||
// and optional components. After successful creation,
|
||||
// Storage is immediately ready to work through API.
|
||||
type Storage struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
mItems map[storageKey]*usedSpaceEstimations
|
||||
}
|
||||
|
||||
// Prm groups the required parameters of the Storage's constructor.
|
||||
//
|
||||
// The component is not parameterizable at the moment.
|
||||
type Prm struct{}
|
||||
|
||||
// New creates a new instance of the Storage.
|
||||
//
|
||||
// The created Storage does not require additional
|
||||
// initialization and is completely ready for work.
|
||||
func New(_ Prm) *Storage {
|
||||
return &Storage{
|
||||
mItems: make(map[storageKey]*usedSpaceEstimations),
|
||||
}
|
||||
}
|
||||
|
||||
// Put appends the next value of the occupied container space for the epoch
|
||||
// to the list of already saved values.
|
||||
//
|
||||
// Always returns nil error.
|
||||
func (s *Storage) Put(a container.SizeEstimation) error {
|
||||
s.mtx.Lock()
|
||||
|
||||
{
|
||||
key := storageKey{
|
||||
epoch: a.Epoch(),
|
||||
cid: a.Container().EncodeToString(),
|
||||
}
|
||||
|
||||
estimations, ok := s.mItems[key]
|
||||
if !ok {
|
||||
estimations = &usedSpaceEstimations{
|
||||
announcement: a,
|
||||
sizes: make([]uint64, 0, 1),
|
||||
}
|
||||
|
||||
s.mItems[key] = estimations
|
||||
}
|
||||
|
||||
estimations.sizes = append(estimations.sizes, a.Value())
|
||||
}
|
||||
|
||||
s.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) Close(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Iterate goes through all the lists with the key (container ID, epoch),
|
||||
// calculates the final grade for all values, and passes it to the handler.
|
||||
//
|
||||
// Final grade is the average between 10th and 90th percentiles.
|
||||
func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) (err error) {
|
||||
s.mtx.RLock()
|
||||
|
||||
{
|
||||
for _, v := range s.mItems {
|
||||
if f(v.announcement) {
|
||||
// calculate estimation based on 90th percentile
|
||||
v.announcement.SetValue(finalEstimation(v.sizes))
|
||||
|
||||
if err = h(v.announcement); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.mtx.RUnlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func finalEstimation(vals []uint64) uint64 {
|
||||
sort.Slice(vals, func(i, j int) bool {
|
||||
return vals[i] < vals[j]
|
||||
})
|
||||
|
||||
const (
|
||||
lowerRank = 10
|
||||
upperRank = 90
|
||||
)
|
||||
|
||||
if len(vals) >= lowerRank {
|
||||
lowerInd := percentile(lowerRank, vals)
|
||||
upperInd := percentile(upperRank, vals)
|
||||
|
||||
vals = vals[lowerInd:upperInd]
|
||||
}
|
||||
|
||||
sum := uint64(0)
|
||||
|
||||
for i := range vals {
|
||||
sum += vals[i]
|
||||
}
|
||||
|
||||
return sum / uint64(len(vals))
|
||||
}
|
||||
|
||||
func percentile(rank int, vals []uint64) int {
|
||||
p := len(vals) * rank / 100
|
||||
return p
|
||||
}
|
|
@ -1,50 +0,0 @@
|
|||
package loadstorage
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestStorage(t *testing.T) {
|
||||
const epoch uint64 = 13
|
||||
|
||||
var a container.SizeEstimation
|
||||
a.SetContainer(cidtest.ID())
|
||||
a.SetEpoch(epoch)
|
||||
|
||||
const opinionsNum = 100
|
||||
|
||||
s := New(Prm{})
|
||||
|
||||
opinions := make([]uint64, opinionsNum)
|
||||
for i := range opinions {
|
||||
opinions[i] = rand.Uint64()
|
||||
|
||||
a.SetValue(opinions[i])
|
||||
|
||||
require.NoError(t, s.Put(a))
|
||||
}
|
||||
|
||||
iterCounter := 0
|
||||
|
||||
err := s.Iterate(
|
||||
func(ai container.SizeEstimation) bool {
|
||||
return ai.Epoch() == epoch
|
||||
},
|
||||
func(ai container.SizeEstimation) error {
|
||||
iterCounter++
|
||||
|
||||
require.Equal(t, epoch, ai.Epoch())
|
||||
require.Equal(t, a.Container(), ai.Container())
|
||||
require.Equal(t, finalEstimation(opinions), ai.Value())
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, iterCounter)
|
||||
}
|
Loading…
Add table
Reference in a new issue