fix/355-increase-tree-service-client-cache-size #359

Closed
ale64bit wants to merge 156 commits from ale64bit/frostfs-node:fix/355-increase-tree-service-client-cache-size into support/v0.36
65 changed files with 15 additions and 4956 deletions
Showing only changes of commit 560f73ab7e - Show all commits

View file

@ -142,7 +142,6 @@ func setWorkersDefaults(cfg *viper.Viper) {
cfg.SetDefault("workers.frostfs", "10")
cfg.SetDefault("workers.container", "10")
cfg.SetDefault("workers.alphabet", "10")
cfg.SetDefault("workers.reputation", "10")
cfg.SetDefault("workers.subnet", "10")
}

View file

@ -55,8 +55,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
trustcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/controller"
truststorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/storage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
@ -414,7 +412,6 @@ type cfg struct {
cfgNodeInfo cfgNodeInfo
cfgNetmap cfgNetmap
cfgControlService cfgControlService
cfgReputation cfgReputation
cfgObject cfgObject
cfgNotifications cfgNotifications
}
@ -452,8 +449,6 @@ type cfgMorph struct {
// TTL of Sidechain cached values. Non-positive value disables caching.
cacheTTL time.Duration
eigenTrustTicker *eigenTrustTickers // timers for EigenTrust iterations
proxyScriptHash neogoutil.Uint160
}
@ -532,16 +527,6 @@ type cfgControlService struct {
server *grpc.Server
}
type cfgReputation struct {
workerPool util.WorkerPool // pool for EigenTrust algorithm's iterations
localTrustStorage *truststorage.Storage
localTrustCtrl *trustcontroller.Controller
scriptHash neogoutil.Uint160
}
var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
func initCfg(appCfg *config.Config) *cfg {
@ -582,8 +567,6 @@ func initCfg(appCfg *config.Config) *cfg {
}
c.cfgObject = initCfgObject(appCfg)
c.cfgReputation = initReputation(appCfg)
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
c.metricsCollector = metrics.NewNodeMetrics()
@ -662,16 +645,6 @@ func initContainer(appCfg *config.Config) cfgContainer {
}
}
func initReputation(appCfg *config.Config) cfgReputation {
reputationWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
return cfgReputation{
scriptHash: contractsconfig.Reputation(appCfg),
workerPool: reputationWorkerPool,
}
}
func initCfgGRPC() cfgGRPC {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes

View file

@ -330,7 +330,7 @@ type remoteLoadAnnounceProvider struct {
netmapKeys netmapCore.AnnouncedKeys
clientCache interface {
Get(client.NodeInfo) (client.Client, error)
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
deadEndProvider loadcontroller.WriterProvider

View file

@ -102,7 +102,6 @@ func initApp(ctx context.Context, c *cfg) {
initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) })
initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) })
initAndLog(c, "session", initSessionService)
initAndLog(c, "reputation", func(c *cfg) { initReputationService(ctx, c) })
initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) })
initAndLog(c, "object", initObjectService)
initAndLog(c, "tree", initTreeService)

View file

@ -235,8 +235,6 @@ func listenMorphNotifications(ctx context.Context, c *cfg) {
zap.String("chain", "side"),
zap.Uint32("block_index", block.Index))
}
tickBlockTimers(c)
})
}
@ -285,7 +283,6 @@ func lookupScriptHashesInNNS(c *cfg) {
{&c.cfgNetmap.scriptHash, client.NNSNetmapContractName},
{&c.cfgAccounting.scriptHash, client.NNSBalanceContractName},
{&c.cfgContainer.scriptHash, client.NNSContainerContractName},
{&c.cfgReputation.scriptHash, client.NNSReputationContractName},
{&c.cfgMorph.proxyScriptHash, client.NNSProxyContractName},
}
)

View file

@ -1,7 +1,6 @@
package main
import (
"bytes"
"context"
"errors"
"fmt"
@ -12,7 +11,6 @@ import (
policerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/policer"
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -20,6 +18,7 @@ import (
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
objectTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/object/grpc"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl"
@ -37,15 +36,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/policer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
truststorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/storage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
)
@ -153,39 +147,12 @@ func (f *innerRingFetcherWithoutNotary) InnerRingKeys() ([][]byte, error) {
return result, nil
}
type coreClientConstructor reputationClientConstructor
func (x *coreClientConstructor) Get(info coreclient.NodeInfo) (coreclient.MultiAddressClient, error) {
c, err := (*reputationClientConstructor)(x).Get(info)
if err != nil {
return nil, err
}
return c.(coreclient.MultiAddressClient), nil
}
func initObjectService(c *cfg) {
keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore, c.cfgNetmap.state)
clientConstructor := &reputationClientConstructor{
log: c.log,
nmSrc: c.netMapSource,
netState: c.cfgNetmap.state,
trustStorage: c.cfgReputation.localTrustStorage,
basicConstructor: c.bgClientCache,
}
c.replicator = createReplicator(c, keyStorage, c.bgClientCache)
coreConstructor := &coreClientConstructor{
log: c.log,
nmSrc: c.netMapSource,
netState: c.cfgNetmap.state,
trustStorage: c.cfgReputation.localTrustStorage,
basicConstructor: c.clientCache,
}
c.replicator = createReplicator(c, keyStorage, clientConstructor)
addPolicer(c, keyStorage, clientConstructor)
addPolicer(c, keyStorage, c.bgClientCache)
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
@ -193,11 +160,11 @@ func initObjectService(c *cfg) {
sPutV2 := createPutSvcV2(sPut, keyStorage)
sSearch := createSearchSvc(c, keyStorage, traverseGen, coreConstructor)
sSearch := createSearchSvc(c, keyStorage, traverseGen, c.clientCache)
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, coreConstructor)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache)
*c.cfgObject.getSvc = *sGet // need smth better
@ -236,7 +203,7 @@ func initObjectService(c *cfg) {
}
}
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *reputationClientConstructor) {
func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.ClientCache) {
ls := c.cfgObject.cfgLocalStorage.localStorage
pol := policer.New(
@ -288,7 +255,7 @@ func createInnerRingFetcher(c *cfg) v2.InnerRingFetcher {
}
}
func createReplicator(c *cfg, keyStorage *util.KeyStorage, clientConstructor *reputationClientConstructor) *replicator.Replicator {
func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCache) *replicator.Replicator {
ls := c.cfgObject.cfgLocalStorage.localStorage
return replicator.New(
@ -298,7 +265,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, clientConstructor *re
),
replicator.WithLocalStorage(ls),
replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, (*coreClientConstructor)(clientConstructor)),
putsvc.NewRemoteSender(keyStorage, cache),
),
)
}
@ -319,17 +286,9 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage) *putsvc.Service {
}
}
putConstructor := &coreClientConstructor{
log: c.log,
nmSrc: c.netMapSource,
netState: c.cfgNetmap.state,
trustStorage: c.cfgReputation.localTrustStorage,
basicConstructor: c.putClientCache,
}
return putsvc.NewService(
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
putsvc.WithClientConstructor(c.putClientCache),
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
putsvc.WithObjectStorage(os),
putsvc.WithContainerSource(c.cfgObject.cnrSource),
@ -348,7 +307,7 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
)
}
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *coreClientConstructor) *searchsvc.Service {
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
return searchsvc.New(
@ -373,7 +332,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
}
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *coreClientConstructor) *getsvc.Service {
coreConstructor *cache.ClientCache) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
return getsvc.New(
@ -480,135 +439,6 @@ func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) {
return eaclInfo, nil
}
type reputationClientConstructor struct {
log *logger.Logger
nmSrc netmap.Source
netState netmap.State
trustStorage *truststorage.Storage
basicConstructor interface {
Get(coreclient.NodeInfo) (coreclient.Client, error)
}
}
type reputationClient struct {
coreclient.MultiAddressClient
prm truststorage.UpdatePrm
cons *reputationClientConstructor
}
func (c *reputationClient) submitResult(err error) {
currEpoch := c.cons.netState.CurrentEpoch()
sat := err == nil
c.cons.log.Debug(
"writing local reputation values",
zap.Uint64("epoch", currEpoch),
zap.Bool("satisfactory", sat),
)
prm := c.prm
prm.SetSatisfactory(sat)
prm.SetEpoch(currEpoch)
c.cons.trustStorage.Update(prm)
}
func (c *reputationClient) ObjectPutInit(ctx context.Context, prm client.PrmObjectPutInit) (*client.ObjectWriter, error) {
res, err := c.MultiAddressClient.ObjectPutInit(ctx, prm)
// FIXME: (neofs-node#1193) here we submit only initialization errors, writing errors are not processed
c.submitResult(err)
return res, err
}
func (c *reputationClient) ObjectDelete(ctx context.Context, prm client.PrmObjectDelete) (*client.ResObjectDelete, error) {
res, err := c.MultiAddressClient.ObjectDelete(ctx, prm)
if err != nil {
c.submitResult(err)
} else {
c.submitResult(apistatus.ErrFromStatus(res.Status()))
}
return res, err
}
func (c *reputationClient) GetObjectInit(ctx context.Context, prm client.PrmObjectGet) (*client.ObjectReader, error) {
res, err := c.MultiAddressClient.ObjectGetInit(ctx, prm)
// FIXME: (neofs-node#1193) here we submit only initialization errors, reading errors are not processed
c.submitResult(err)
return res, err
}
func (c *reputationClient) ObjectHead(ctx context.Context, prm client.PrmObjectHead) (*client.ResObjectHead, error) {
res, err := c.MultiAddressClient.ObjectHead(ctx, prm)
c.submitResult(err)
return res, err
}
func (c *reputationClient) ObjectHash(ctx context.Context, prm client.PrmObjectHash) (*client.ResObjectHash, error) {
res, err := c.MultiAddressClient.ObjectHash(ctx, prm)
c.submitResult(err)
return res, err
}
func (c *reputationClient) ObjectSearchInit(ctx context.Context, prm client.PrmObjectSearch) (*client.ObjectListReader, error) {
res, err := c.MultiAddressClient.ObjectSearchInit(ctx, prm)
// FIXME: (neofs-node#1193) here we submit only initialization errors, reading errors are not processed
c.submitResult(err)
return res, err
}
func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (coreclient.Client, error) {
cl, err := c.basicConstructor.Get(info)
if err != nil {
return nil, err
}
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
if err == nil {
key := info.PublicKey()
nmNodes := nm.Nodes()
var peer apireputation.PeerID
for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), key) {
peer.SetPublicKey(nmNodes[i].PublicKey())
prm := truststorage.UpdatePrm{}
prm.SetPeer(peer)
return &reputationClient{
MultiAddressClient: cl.(coreclient.MultiAddressClient),
prm: prm,
cons: c,
}, nil
}
}
} else {
c.log.Warn(logs.FrostFSNodeCouldNotGetLatestNetworkMapToOverloadTheClient,
zap.String("error", err.Error()),
)
}
return cl, nil
}
type engineWithNotifications struct {
base putsvc.ObjectStorage
nw notificationWriter

View file

@ -1,386 +0,0 @@
package main
import (
"context"
"fmt"
v2reputation "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
v2reputationgrpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/common"
intermediatereputation "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/intermediate"
localreputation "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/local"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/ticker"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
grpcreputation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/reputation/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
reputationrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common/router"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
eigentrustcalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
eigentrustctrl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/controller"
intermediateroutes "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/routes"
consumerstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/consumers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/daughters"
localtrustcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/controller"
localroutes "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/routes"
truststorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/storage"
reputationrpc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
func initReputationService(ctx context.Context, c *cfg) {
wrap, err := repClient.NewFromMorph(c.cfgMorph.client, c.cfgReputation.scriptHash, 0, repClient.TryNotary())
fatalOnErr(err)
localKey := c.key.PublicKey().Bytes()
nmSrc := c.netMapSource
// storing calculated trusts as a daughter
c.cfgReputation.localTrustStorage = truststorage.New(
truststorage.Prm{},
)
daughterStorage := daughters.New(daughters.Prm{})
consumerStorage := consumerstorage.New(consumerstorage.Prm{})
localTrustLogger := &logger.Logger{Logger: c.log.With(zap.String("trust_type", "local"))}
managerBuilder := reputationcommon.NewManagerBuilder(
reputationcommon.ManagersPrm{
NetMapSource: nmSrc,
},
reputationcommon.WithLogger(c.log),
)
localRouteBuilder := localroutes.New(
localroutes.Prm{
ManagerBuilder: managerBuilder,
Log: localTrustLogger,
},
)
localTrustRouter := createLocalTrustRouter(c, localRouteBuilder, localTrustLogger, daughterStorage)
intermediateTrustRouter := createIntermediateTrustRouter(c, consumerStorage, managerBuilder)
eigenTrustController := createEigenTrustController(c, intermediateTrustRouter, localKey, wrap, daughterStorage, consumerStorage)
c.cfgReputation.localTrustCtrl = createLocalTrustController(c, localTrustLogger, localKey, localTrustRouter)
addReputationReportHandler(ctx, c)
server := grpcreputation.New(
reputationrpc.NewSignService(
&c.key.PrivateKey,
reputationrpc.NewResponseService(
&reputationServer{
cfg: c,
log: c.log,
localRouter: localTrustRouter,
intermediateRouter: intermediateTrustRouter,
routeBuilder: localRouteBuilder,
},
c.respSvc,
),
),
)
for _, srv := range c.cfgGRPC.servers {
v2reputationgrpc.RegisterReputationServiceServer(srv, server)
}
// initialize eigen trust block timer
newEigenTrustIterTimer(c)
addEigenTrustEpochHandler(ctx, c, eigenTrustController)
}
func addReputationReportHandler(ctx context.Context, c *cfg) {
addNewEpochAsyncNotificationHandler(
c,
func(ev event.Event) {
c.log.Debug(logs.FrostFSNodeStartReportingReputationOnNewEpochEvent)
var reportPrm localtrustcontroller.ReportPrm
// report collected values from previous epoch
reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1)
c.cfgReputation.localTrustCtrl.Report(ctx, reportPrm)
},
)
}
func addEigenTrustEpochHandler(ctx context.Context, c *cfg, eigenTrustController *eigentrustctrl.Controller) {
addNewEpochAsyncNotificationHandler(
c,
func(e event.Event) {
epoch := e.(netmap.NewEpoch).EpochNumber()
log := c.log.With(zap.Uint64("epoch", epoch))
duration, err := c.cfgNetmap.wrapper.EpochDuration()
if err != nil {
log.Debug(logs.FrostFSNodeCouldNotFetchEpochDuration, zap.Error(err))
return
}
iterations, err := c.cfgNetmap.wrapper.EigenTrustIterations()
if err != nil {
log.Debug(logs.FrostFSNodeCouldNotFetchIterationNumber, zap.Error(err))
return
}
epochTimer, err := ticker.NewIterationsTicker(duration, iterations, func() {
eigenTrustController.Continue(ctx,
eigentrustctrl.ContinuePrm{
Epoch: epoch - 1,
},
)
})
if err != nil {
log.Debug(logs.FrostFSNodeCouldNotCreateFixedEpochTimer, zap.Error(err))
return
}
c.cfgMorph.eigenTrustTicker.addEpochTimer(epoch, epochTimer)
},
)
}
func createLocalTrustRouter(c *cfg, localRouteBuilder *localroutes.Builder, localTrustLogger *logger.Logger, daughterStorage *daughters.Storage) *reputationrouter.Router {
// storing received daughter(of current node) trusts as a manager
daughterStorageWriterProvider := &intermediatereputation.DaughterStorageWriterProvider{
Log: c.log,
Storage: daughterStorage,
}
remoteLocalTrustProvider := common.NewRemoteTrustProvider(
common.RemoteProviderPrm{
NetmapKeys: c,
DeadEndProvider: daughterStorageWriterProvider,
ClientCache: c.bgClientCache,
WriterProvider: localreputation.NewRemoteProvider(
localreputation.RemoteProviderPrm{
Key: &c.key.PrivateKey,
Log: localTrustLogger,
},
),
Log: localTrustLogger,
},
)
localTrustRouter := reputationrouter.New(
reputationrouter.Prm{
LocalServerInfo: c,
RemoteWriterProvider: remoteLocalTrustProvider,
Builder: localRouteBuilder,
},
reputationrouter.WithLogger(localTrustLogger))
return localTrustRouter
}
func createIntermediateTrustRouter(c *cfg, consumerStorage *consumerstorage.Storage, managerBuilder reputationcommon.ManagerBuilder) *reputationrouter.Router {
intermediateTrustLogger := &logger.Logger{Logger: c.log.With(zap.String("trust_type", "intermediate"))}
consumerStorageWriterProvider := &intermediatereputation.ConsumerStorageWriterProvider{
Log: c.log,
Storage: consumerStorage,
}
remoteIntermediateTrustProvider := common.NewRemoteTrustProvider(
common.RemoteProviderPrm{
NetmapKeys: c,
DeadEndProvider: consumerStorageWriterProvider,
ClientCache: c.bgClientCache,
WriterProvider: intermediatereputation.NewRemoteProvider(
intermediatereputation.RemoteProviderPrm{
Key: &c.key.PrivateKey,
Log: intermediateTrustLogger,
},
),
Log: intermediateTrustLogger,
},
)
intermediateRouteBuilder := intermediateroutes.New(
intermediateroutes.Prm{
ManagerBuilder: managerBuilder,
Log: intermediateTrustLogger,
},
)
intermediateTrustRouter := reputationrouter.New(
reputationrouter.Prm{
LocalServerInfo: c,
RemoteWriterProvider: remoteIntermediateTrustProvider,
Builder: intermediateRouteBuilder,
},
reputationrouter.WithLogger(intermediateTrustLogger),
)
return intermediateTrustRouter
}
func createEigenTrustController(c *cfg, intermediateTrustRouter *reputationrouter.Router, localKey []byte, wrap *repClient.Client,
daughterStorage *daughters.Storage, consumerStorage *consumerstorage.Storage) *eigentrustctrl.Controller {
eigenTrustCalculator := eigentrustcalc.New(
eigentrustcalc.Prm{
AlphaProvider: c.cfgNetmap.wrapper,
InitialTrustSource: intermediatereputation.InitialTrustSource{
NetMap: c.netMapSource,
},
IntermediateValueTarget: intermediateTrustRouter,
WorkerPool: c.cfgReputation.workerPool,
FinalResultTarget: intermediatereputation.NewFinalWriterProvider(
intermediatereputation.FinalWriterProviderPrm{
PrivatKey: &c.key.PrivateKey,
PubKey: localKey,
Client: wrap,
},
intermediatereputation.FinalWriterWithLogger(c.log),
),
DaughterTrustSource: &intermediatereputation.DaughterTrustIteratorProvider{
DaughterStorage: daughterStorage,
ConsumerStorage: consumerStorage,
},
},
eigentrustcalc.WithLogger(c.log),
)
eigenTrustController := eigentrustctrl.New(
eigentrustctrl.Prm{
DaughtersTrustCalculator: &intermediatereputation.DaughtersTrustCalculator{
Calculator: eigenTrustCalculator,
},
IterationsProvider: c.cfgNetmap.wrapper,
WorkerPool: c.cfgReputation.workerPool,
},
eigentrustctrl.WithLogger(c.log),
)
return eigenTrustController
}
func createLocalTrustController(c *cfg, localTrustLogger *logger.Logger, localKey []byte, localTrustRouter *reputationrouter.Router) *localtrustcontroller.Controller {
localTrustStorage := &localreputation.TrustStorage{
Log: localTrustLogger,
Storage: c.cfgReputation.localTrustStorage,
NmSrc: c.netMapSource,
LocalKey: localKey,
}
return localtrustcontroller.New(
localtrustcontroller.Prm{
LocalTrustSource: localTrustStorage,
LocalTrustTarget: localTrustRouter,
},
localtrustcontroller.WithLogger(c.log),
)
}
type reputationServer struct {
*cfg
log *logger.Logger
localRouter *reputationrouter.Router
intermediateRouter *reputationrouter.Router
routeBuilder reputationrouter.Builder
}
func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *v2reputation.AnnounceLocalTrustRequest) (*v2reputation.AnnounceLocalTrustResponse, error) {
passedRoute := reverseRoute(req.GetVerificationHeader())
passedRoute = append(passedRoute, s)
body := req.GetBody()
ep := &common.EpochProvider{
E: body.GetEpoch(),
}
w, err := s.localRouter.InitWriter(reputationrouter.NewRouteInfo(ep, passedRoute))
if err != nil {
return nil, fmt.Errorf("could not initialize local trust writer: %w", err)
}
for _, trust := range body.GetTrusts() {
err = s.processLocalTrust(ctx, body.GetEpoch(), apiToLocalTrust(&trust, passedRoute[0].PublicKey()), passedRoute, w)
if err != nil {
return nil, fmt.Errorf("could not write one of local trusts: %w", err)
}
}
resp := new(v2reputation.AnnounceLocalTrustResponse)
resp.SetBody(new(v2reputation.AnnounceLocalTrustResponseBody))
return resp, nil
}
func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *v2reputation.AnnounceIntermediateResultRequest) (*v2reputation.AnnounceIntermediateResultResponse, error) {
passedRoute := reverseRoute(req.GetVerificationHeader())
passedRoute = append(passedRoute, s)
body := req.GetBody()
ei := eigentrust.NewEpochIteration(body.GetEpoch(), body.GetIteration())
w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteInfo(ei, passedRoute))
if err != nil {
return nil, fmt.Errorf("could not initialize trust writer: %w", err)
}
v2Trust := body.GetTrust()
trust := apiToLocalTrust(v2Trust.GetTrust(), v2Trust.GetTrustingPeer().GetPublicKey())
err = w.Write(ctx, trust)
if err != nil {
return nil, fmt.Errorf("could not write trust: %w", err)
}
resp := new(v2reputation.AnnounceIntermediateResultResponse)
resp.SetBody(new(v2reputation.AnnounceIntermediateResultResponseBody))
return resp, nil
}
func (s *reputationServer) processLocalTrust(ctx context.Context, epoch uint64, t reputation.Trust,
passedRoute []reputationcommon.ServerInfo, w reputationcommon.Writer) error {
err := reputationrouter.CheckRoute(s.routeBuilder, epoch, t, passedRoute)
if err != nil {
return fmt.Errorf("wrong route of reputation trust value: %w", err)
}
return w.Write(ctx, t)
}
// apiToLocalTrust converts v2 Trust to local reputation.Trust, adding trustingPeer.
func apiToLocalTrust(t *v2reputation.Trust, trustingPeer []byte) reputation.Trust {
var trusted, trusting apireputation.PeerID
trusted.SetPublicKey(t.GetPeer().GetPublicKey())
trusting.SetPublicKey(trustingPeer)
localTrust := reputation.Trust{}
localTrust.SetValue(reputation.TrustValueFromFloat64(t.GetValue()))
localTrust.SetPeer(trusted)
localTrust.SetTrustingPeer(trusting)
return localTrust
}
func reverseRoute(hdr *session.RequestVerificationHeader) (passedRoute []reputationcommon.ServerInfo) {
for hdr != nil {
passedRoute = append(passedRoute, &common.OnlyKeyRemoteServerInfo{
Key: hdr.GetBodySignature().GetKey(),
})
hdr = hdr.GetOrigin()
}
return
}

View file

@ -1,101 +0,0 @@
package common
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
trustcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/controller"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
type clientCache interface {
Get(client.NodeInfo) (client.Client, error)
}
// clientKeyRemoteProvider must provide a remote writer and take into account
// that requests must be sent via the passed api client and must be signed with
// the passed private key.
type clientKeyRemoteProvider interface {
WithClient(client.Client) reputationcommon.WriterProvider
}
// RemoteTrustProvider is an implementation of reputation RemoteWriterProvider interface.
// It caches clients, checks if it is the end of the route and checks either the current
// node is a remote target or not.
//
// remoteTrustProvider requires to be provided with clientKeyRemoteProvider.
type RemoteTrustProvider struct {
netmapKeys netmap.AnnouncedKeys
deadEndProvider reputationcommon.WriterProvider
clientCache clientCache
remoteProvider clientKeyRemoteProvider
log *logger.Logger
}
// RemoteProviderPrm groups the required parameters of the remoteTrustProvider's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type RemoteProviderPrm struct {
NetmapKeys netmap.AnnouncedKeys
DeadEndProvider reputationcommon.WriterProvider
ClientCache clientCache
WriterProvider clientKeyRemoteProvider
Log *logger.Logger
}
func NewRemoteTrustProvider(prm RemoteProviderPrm) *RemoteTrustProvider {
switch {
case prm.NetmapKeys == nil:
PanicOnPrmValue("NetmapKeys", prm.NetmapKeys)
case prm.DeadEndProvider == nil:
PanicOnPrmValue("DeadEndProvider", prm.DeadEndProvider)
case prm.ClientCache == nil:
PanicOnPrmValue("ClientCache", prm.ClientCache)
case prm.WriterProvider == nil:
PanicOnPrmValue("WriterProvider", prm.WriterProvider)
case prm.Log == nil:
PanicOnPrmValue("Logger", prm.Log)
}
return &RemoteTrustProvider{
netmapKeys: prm.NetmapKeys,
deadEndProvider: prm.DeadEndProvider,
clientCache: prm.ClientCache,
remoteProvider: prm.WriterProvider,
log: prm.Log,
}
}
func (rtp *RemoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (reputationcommon.WriterProvider, error) {
rtp.log.Debug(logs.CommonInitializingRemoteWriterProvider)
if srv == nil {
rtp.log.Debug(logs.CommonRouteHasReachedDeadendProvider)
return rtp.deadEndProvider, nil
}
if rtp.netmapKeys.IsLocalKey(srv.PublicKey()) {
// if local => return no-op writer
rtp.log.Debug(logs.CommonInitializingNoopWriterProvider)
return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil
}
var info client.NodeInfo
err := client.NodeInfoFromRawNetmapElement(&info, srv)
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
c, err := rtp.clientCache.Get(info)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
return rtp.remoteProvider.WithClient(c), nil
}

View file

@ -1,53 +0,0 @@
package common
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
)
type EpochProvider struct {
E uint64
}
func (ep *EpochProvider) Epoch() uint64 {
return ep.E
}
type NopReputationWriter struct{}
func (NopReputationWriter) Write(context.Context, reputation.Trust) error {
return nil
}
func (NopReputationWriter) Close(context.Context) error {
return nil
}
// OnlyKeyRemoteServerInfo is an implementation of reputation.ServerInfo
// interface but with only public key data.
type OnlyKeyRemoteServerInfo struct {
Key []byte
}
func (i *OnlyKeyRemoteServerInfo) PublicKey() []byte {
return i.Key
}
func (*OnlyKeyRemoteServerInfo) IterateAddresses(func(string) bool) {
}
func (*OnlyKeyRemoteServerInfo) NumberOfAddresses() int {
return 0
}
func (*OnlyKeyRemoteServerInfo) ExternalAddresses() []string {
return nil
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func PanicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}

View file

@ -1,57 +0,0 @@
package intermediate
import (
"context"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
eigencalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
eigentrustctrl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/controller"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
// InitialTrustSource is an implementation of the
// reputation/eigentrust/calculator's InitialTrustSource interface.
type InitialTrustSource struct {
NetMap netmap.Source
}
var ErrEmptyNetMap = errors.New("empty NepMap")
// InitialTrust returns `initialTrust` as an initial trust value.
func (i InitialTrustSource) InitialTrust(apireputation.PeerID) (reputation.TrustValue, error) {
nm, err := i.NetMap.GetNetMap(1)
if err != nil {
return reputation.TrustZero, fmt.Errorf("failed to get NetMap: %w", err)
}
nodeCount := reputation.TrustValueFromFloat64(float64(len(nm.Nodes())))
if nodeCount == 0 {
return reputation.TrustZero, ErrEmptyNetMap
}
return reputation.TrustOne.Div(nodeCount), nil
}
// DaughtersTrustCalculator wraps EigenTrust calculator and implements the
// eigentrust/calculator's DaughtersTrustCalculator interface.
type DaughtersTrustCalculator struct {
Calculator *eigencalc.Calculator
}
// Calculate converts and passes values to the wrapped calculator.
func (c *DaughtersTrustCalculator) Calculate(ctx context.Context, iterCtx eigentrustctrl.IterationContext) {
calcPrm := eigencalc.CalculatePrm{}
epochIteration := eigentrust.EpochIteration{}
epochIteration.SetEpoch(iterCtx.Epoch())
epochIteration.SetI(iterCtx.I())
calcPrm.SetLast(iterCtx.Last())
calcPrm.SetEpochIteration(epochIteration)
c.Calculator.Calculate(ctx, calcPrm)
}

View file

@ -1,66 +0,0 @@
package intermediate
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
eigencalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
consumerstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/consumers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
var ErrIncorrectContextPanicMsg = "could not write intermediate trust: passed context incorrect"
// ConsumerStorageWriterProvider is an implementation of the reputation.WriterProvider
// interface that provides ConsumerTrustWriter writer.
type ConsumerStorageWriterProvider struct {
Log *logger.Logger
Storage *consumerstorage.Storage
}
// ConsumerTrustWriter is an implementation of the reputation.Writer interface
// that writes passed consumer's Trust values to the Consumer storage. After writing
// that, values can be used in eigenTrust algorithm's iterations.
type ConsumerTrustWriter struct {
log *logger.Logger
storage *consumerstorage.Storage
iterInfo eigencalc.EpochIterationInfo
}
func (w *ConsumerTrustWriter) Write(_ context.Context, t reputation.Trust) error {
w.log.Debug(logs.IntermediateWritingReceivedConsumersTrusts,
zap.Uint64("epoch", w.iterInfo.Epoch()),
zap.Uint32("iteration", w.iterInfo.I()),
zap.Stringer("trusting_peer", t.TrustingPeer()),
zap.Stringer("trusted_peer", t.Peer()),
)
trust := eigentrust.IterationTrust{Trust: t}
trust.SetEpoch(w.iterInfo.Epoch())
trust.SetI(w.iterInfo.I())
w.storage.Put(trust)
return nil
}
func (w *ConsumerTrustWriter) Close(context.Context) error {
return nil
}
func (s *ConsumerStorageWriterProvider) InitWriter(ep reputationcommon.EpochProvider) (reputationcommon.Writer, error) {
iterInfo, ok := ep.(eigencalc.EpochIterationInfo)
if !ok {
panic(ErrIncorrectContextPanicMsg)
}
return &ConsumerTrustWriter{
log: s.Log,
storage: s.Storage,
iterInfo: iterInfo,
}, nil
}

View file

@ -1,147 +0,0 @@
package intermediate
import (
"crypto/ecdsa"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
eigentrustcalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
// FinalWriterProviderPrm groups the required parameters of the FinalWriterProvider's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type FinalWriterProviderPrm struct {
PrivatKey *ecdsa.PrivateKey
PubKey []byte
Client *repClient.Client
}
// NewFinalWriterProvider creates a new instance of the FinalWriterProvider.
//
// Panics if at least one value of the parameters is invalid.
//
// The created FinalWriterProvider does not require additional
// initialization and is completely ready for work.
func NewFinalWriterProvider(prm FinalWriterProviderPrm, opts ...FinalWriterOption) *FinalWriterProvider {
o := defaultFinalWriterOptionsOpts()
for i := range opts {
opts[i](o)
}
return &FinalWriterProvider{
prm: prm,
opts: o,
}
}
// FinalWriterProvider is an implementation of the reputation.eigentrust.calculator
// IntermediateWriterProvider interface. It inits FinalWriter.
type FinalWriterProvider struct {
prm FinalWriterProviderPrm
opts *finalWriterOptions
}
func (fwp FinalWriterProvider) InitIntermediateWriter(
_ eigentrustcalc.EpochIterationInfo) (eigentrustcalc.IntermediateWriter, error) {
return &FinalWriter{
privatKey: fwp.prm.PrivatKey,
pubKey: fwp.prm.PubKey,
client: fwp.prm.Client,
l: fwp.opts.log,
}, nil
}
// FinalWriter is an implementation of the reputation.eigentrust.calculator IntermediateWriter
// interface that writes GlobalTrust to contract directly.
type FinalWriter struct {
privatKey *ecdsa.PrivateKey
pubKey []byte
client *repClient.Client
l *logger.Logger
}
func (fw FinalWriter) WriteIntermediateTrust(t eigentrust.IterationTrust) error {
fw.l.Debug(logs.IntermediateStartWritingGlobalTrustsToContract)
args := repClient.PutPrm{}
apiTrustedPeerID := t.Peer()
var apiTrust apireputation.Trust
apiTrust.SetValue(t.Value().Float64())
apiTrust.SetPeer(t.Peer())
var managerPublicKey [33]byte
copy(managerPublicKey[:], fw.pubKey)
var apiMangerPeerID apireputation.PeerID
apiMangerPeerID.SetPublicKey(managerPublicKey[:])
var gTrust apireputation.GlobalTrust
gTrust.SetTrust(apiTrust)
gTrust.SetManager(apiMangerPeerID)
err := gTrust.Sign(frostfsecdsa.Signer(*fw.privatKey))
if err != nil {
fw.l.Debug(
"failed to sign global trust",
zap.Error(err),
)
return fmt.Errorf("failed to sign global trust: %w", err)
}
args.SetEpoch(t.Epoch())
args.SetValue(gTrust)
args.SetPeerID(apiTrustedPeerID)
err = fw.client.Put(
args,
)
if err != nil {
fw.l.Debug(
"failed to write global trust to contract",
zap.Error(err),
)
return fmt.Errorf("failed to write global trust to contract: %w", err)
}
fw.l.Debug(
"sent global trust to contract",
zap.Uint64("epoch", t.Epoch()),
zap.Float64("value", t.Value().Float64()),
zap.Stringer("peer", t.Peer()),
)
return nil
}
type finalWriterOptions struct {
log *logger.Logger
}
type FinalWriterOption func(*finalWriterOptions)
func defaultFinalWriterOptionsOpts() *finalWriterOptions {
return &finalWriterOptions{
log: &logger.Logger{Logger: zap.L()},
}
}
func FinalWriterWithLogger(l *logger.Logger) FinalWriterOption {
return func(o *finalWriterOptions) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,51 +0,0 @@
package intermediate
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/daughters"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// DaughterStorageWriterProvider is an implementation of the reputation.WriterProvider
// interface that provides DaughterTrustWriter writer.
type DaughterStorageWriterProvider struct {
Log *logger.Logger
Storage *daughters.Storage
}
// DaughterTrustWriter is an implementation of the reputation.Writer interface
// that writes passed daughter's Trust values to Daughter storage. After writing
// that, values can be used in eigenTrust algorithm's iterations.
type DaughterTrustWriter struct {
log *logger.Logger
storage *daughters.Storage
ep reputationcommon.EpochProvider
}
func (w *DaughterTrustWriter) Write(_ context.Context, t reputation.Trust) error {
w.log.Debug(logs.IntermediateWritingReceivedDaughtersTrusts,
zap.Uint64("epoch", w.ep.Epoch()),
zap.Stringer("trusting_peer", t.TrustingPeer()),
zap.Stringer("trusted_peer", t.Peer()),
)
w.storage.Put(w.ep.Epoch(), t)
return nil
}
func (w *DaughterTrustWriter) Close(context.Context) error {
return nil
}
func (s *DaughterStorageWriterProvider) InitWriter(ep reputationcommon.EpochProvider) (reputationcommon.Writer, error) {
return &DaughterTrustWriter{
log: s.Log,
storage: s.Storage,
ep: ep,
}, nil
}

View file

@ -1,125 +0,0 @@
package intermediate
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/common"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
eigentrustcalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
reputationapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
// RemoteProviderPrm groups the required parameters of the RemoteProvider's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type RemoteProviderPrm struct {
Key *ecdsa.PrivateKey
Log *logger.Logger
}
// NewRemoteProvider creates a new instance of the RemoteProvider.
//
// Panics if at least one value of the parameters is invalid.
//
// The created RemoteProvider does not require additional
// initialization and is completely ready for work.
func NewRemoteProvider(prm RemoteProviderPrm) *RemoteProvider {
switch {
case prm.Key == nil:
common.PanicOnPrmValue("NetMapSource", prm.Key)
case prm.Log == nil:
common.PanicOnPrmValue("Logger", prm.Log)
}
return &RemoteProvider{
key: prm.Key,
log: prm.Log,
}
}
// RemoteProvider is an implementation of the clientKeyRemoteProvider interface.
type RemoteProvider struct {
key *ecdsa.PrivateKey
log *logger.Logger
}
func (rp RemoteProvider) WithClient(c coreclient.Client) reputationcommon.WriterProvider {
return &TrustWriterProvider{
client: c,
key: rp.key,
log: rp.log,
}
}
type TrustWriterProvider struct {
client coreclient.Client
key *ecdsa.PrivateKey
log *logger.Logger
}
func (twp *TrustWriterProvider) InitWriter(ep reputationcommon.EpochProvider) (reputationcommon.Writer, error) {
iterInfo, ok := ep.(eigentrustcalc.EpochIterationInfo)
if !ok {
// TODO: #1164 think if this can be done without such limitation
panic(ErrIncorrectContextPanicMsg)
}
return &RemoteTrustWriter{
iterInfo: iterInfo,
client: twp.client,
key: twp.key,
log: twp.log,
}, nil
}
type RemoteTrustWriter struct {
iterInfo eigentrustcalc.EpochIterationInfo
client coreclient.Client
key *ecdsa.PrivateKey
log *logger.Logger
}
// Write sends a trust value to a remote node via ReputationService.AnnounceIntermediateResult RPC.
func (rtp *RemoteTrustWriter) Write(ctx context.Context, t reputation.Trust) error {
epoch := rtp.iterInfo.Epoch()
i := rtp.iterInfo.I()
rtp.log.Debug(logs.IntermediateAnnouncingTrust,
zap.Uint64("epoch", epoch),
zap.Uint32("iteration", i),
zap.Stringer("trusting_peer", t.TrustingPeer()),
zap.Stringer("trusted_peer", t.Peer()),
)
var apiTrust reputationapi.Trust
apiTrust.SetValue(t.Value().Float64())
apiTrust.SetPeer(t.Peer())
var apiPeerToPeerTrust reputationapi.PeerToPeerTrust
apiPeerToPeerTrust.SetTrustingPeer(t.TrustingPeer())
apiPeerToPeerTrust.SetTrust(apiTrust)
var p internalclient.AnnounceIntermediatePrm
p.SetClient(rtp.client)
p.SetEpoch(epoch)
p.SetIteration(i)
p.SetTrust(apiPeerToPeerTrust)
_, err := internalclient.AnnounceIntermediate(ctx, p)
return err
}
func (rtp *RemoteTrustWriter) Close(context.Context) error {
return nil
}

View file

@ -1,64 +0,0 @@
package intermediate
import (
"fmt"
eigentrustcalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
consumerstorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/consumers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/daughters"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
// DaughterTrustIteratorProvider is an implementation of the
// reputation/eigentrust/calculator's DaughterTrustIteratorProvider interface.
type DaughterTrustIteratorProvider struct {
DaughterStorage *daughters.Storage
ConsumerStorage *consumerstorage.Storage
}
// InitDaughterIterator returns an iterator over the received
// local trusts for ctx.Epoch() epoch from daughter p.
func (ip *DaughterTrustIteratorProvider) InitDaughterIterator(ctx eigentrustcalc.EpochIterationInfo,
p apireputation.PeerID) (eigentrustcalc.TrustIterator, error) {
epoch := ctx.Epoch()
daughterIterator, ok := ip.DaughterStorage.DaughterTrusts(epoch, p)
if !ok {
return nil, fmt.Errorf("no data in %d epoch for daughter: %s", epoch, p)
}
return daughterIterator, nil
}
// InitAllDaughtersIterator returns an iterator over all
// daughters of the current node(manager) and all local
// trusts received from them for ctx.Epoch() epoch.
func (ip *DaughterTrustIteratorProvider) InitAllDaughtersIterator(
ctx eigentrustcalc.EpochIterationInfo) (eigentrustcalc.PeerTrustsIterator, error) {
epoch := ctx.Epoch()
iter, ok := ip.DaughterStorage.AllDaughterTrusts(epoch)
if !ok {
return nil, fmt.Errorf("no data in %d epoch for daughters", epoch)
}
return iter, nil
}
// InitConsumersIterator returns an iterator over all daughters
// of the current node(manager) and all their consumers' local
// trusts for ctx.Epoch() epoch and ctx.I() iteration.
func (ip *DaughterTrustIteratorProvider) InitConsumersIterator(
ctx eigentrustcalc.EpochIterationInfo) (eigentrustcalc.PeerTrustsIterator, error) {
epoch, iter := ctx.Epoch(), ctx.I()
consumerIterator, ok := ip.ConsumerStorage.Consumers(epoch, iter)
if !ok {
return nil, fmt.Errorf("no data for %d iteration in %d epoch for consumers's trusts",
iter,
epoch,
)
}
return consumerIterator, nil
}

View file

@ -1,101 +0,0 @@
package internal
import (
"context"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
type commonPrm struct {
cli coreclient.Client
}
// SetClient sets the base client for FrostFS API communication.
//
// Required parameter.
func (x *commonPrm) SetClient(cli coreclient.Client) {
x.cli = cli
}
// AnnounceLocalPrm groups parameters of AnnounceLocal operation.
type AnnounceLocalPrm struct {
commonPrm
cliPrm client.PrmAnnounceLocalTrust
}
// SetEpoch sets the epoch in which the trust was assessed.
func (x *AnnounceLocalPrm) SetEpoch(epoch uint64) {
x.cliPrm.SetEpoch(epoch)
}
// SetTrusts sets a list of local trust values.
func (x *AnnounceLocalPrm) SetTrusts(ts []reputation.Trust) {
x.cliPrm.SetValues(ts)
}
// AnnounceLocalRes groups the resulting values of AnnounceLocal operation.
type AnnounceLocalRes struct{}
// AnnounceLocal sends estimations of local trust to the remote node.
//
// Client, context and key must be set.
//
// Returns any error which prevented the operation from completing correctly in error return.
func AnnounceLocal(ctx context.Context, prm AnnounceLocalPrm) (res AnnounceLocalRes, err error) {
var cliRes *client.ResAnnounceLocalTrust
cliRes, err = prm.cli.AnnounceLocalTrust(ctx, prm.cliPrm)
if err == nil {
// pull out an error from status
err = apistatus.ErrFromStatus(cliRes.Status())
}
return
}
// AnnounceIntermediatePrm groups parameters of AnnounceIntermediate operation.
type AnnounceIntermediatePrm struct {
commonPrm
cliPrm client.PrmAnnounceIntermediateTrust
}
// SetEpoch sets the number of the epoch when the trust calculation's iteration was executed.
func (x *AnnounceIntermediatePrm) SetEpoch(epoch uint64) {
x.cliPrm.SetEpoch(epoch)
}
// SetIteration sets the number of the iteration of the trust calculation algorithm.
func (x *AnnounceIntermediatePrm) SetIteration(iter uint32) {
x.cliPrm.SetIteration(iter)
}
// SetTrust sets the current global trust value computed at the iteration.
func (x *AnnounceIntermediatePrm) SetTrust(t reputation.PeerToPeerTrust) {
x.cliPrm.SetCurrentValue(t)
}
// AnnounceIntermediateRes groups the resulting values of AnnounceIntermediate operation.
type AnnounceIntermediateRes struct{}
// AnnounceIntermediate sends the global trust value calculated at the specified iteration
// and epoch to to the remote node.
//
// Client, context and key must be set.
//
// Returns any error which prevented the operation from completing correctly in error return.
func AnnounceIntermediate(ctx context.Context, prm AnnounceIntermediatePrm) (res AnnounceIntermediateRes, err error) {
var cliRes *client.ResAnnounceIntermediateTrust
cliRes, err = prm.cli.AnnounceIntermediateTrust(ctx, prm.cliPrm)
if err == nil {
// pull out an error from status
err = apistatus.ErrFromStatus(cliRes.Status())
}
return
}

View file

@ -1,11 +0,0 @@
// Package internal provides functionality for FrostFS Node Reputation system communication with FrostFS network.
// The base client for accessing remote nodes via FrostFS API is a FrostFS SDK Go API client.
// However, although it encapsulates a useful piece of business logic (e.g. the signature mechanism),
// the Reputation service does not fully use the client's flexible interface.
//
// In this regard, this package provides functions over base API client necessary for the application.
// This allows you to concentrate the entire spectrum of the client's use in one place (this will be convenient
// both when updating the base client and for evaluating the UX of SDK library). So, it is expected that all
// Reputation service packages will be limited to this package for the development of functionality requiring
// FrostFS API communication.
package internal

View file

@ -1,113 +0,0 @@
package local
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/common"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
reputationapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
// RemoteProviderPrm groups the required parameters of the RemoteProvider's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type RemoteProviderPrm struct {
Key *ecdsa.PrivateKey
Log *logger.Logger
}
// NewRemoteProvider creates a new instance of the RemoteProvider.
//
// Panics if at least one value of the parameters is invalid.
//
// The created RemoteProvider does not require additional
// initialization and is completely ready for work.
func NewRemoteProvider(prm RemoteProviderPrm) *RemoteProvider {
switch {
case prm.Key == nil:
common.PanicOnPrmValue("NetMapSource", prm.Key)
case prm.Log == nil:
common.PanicOnPrmValue("Logger", prm.Log)
}
return &RemoteProvider{
key: prm.Key,
log: prm.Log,
}
}
// RemoteProvider is an implementation of the clientKeyRemoteProvider interface.
type RemoteProvider struct {
key *ecdsa.PrivateKey
log *logger.Logger
}
func (rp RemoteProvider) WithClient(c coreclient.Client) reputationcommon.WriterProvider {
return &TrustWriterProvider{
client: c,
key: rp.key,
log: rp.log,
}
}
type TrustWriterProvider struct {
client coreclient.Client
key *ecdsa.PrivateKey
log *logger.Logger
}
func (twp *TrustWriterProvider) InitWriter(ep reputationcommon.EpochProvider) (reputationcommon.Writer, error) {
return &RemoteTrustWriter{
ep: ep,
client: twp.client,
key: twp.key,
log: twp.log,
}, nil
}
type RemoteTrustWriter struct {
ep reputationcommon.EpochProvider
client coreclient.Client
key *ecdsa.PrivateKey
log *logger.Logger
buf []reputationapi.Trust
}
func (rtp *RemoteTrustWriter) Write(_ context.Context, t reputation.Trust) error {
var apiTrust reputationapi.Trust
apiTrust.SetValue(t.Value().Float64())
apiTrust.SetPeer(t.Peer())
rtp.buf = append(rtp.buf, apiTrust)
return nil
}
func (rtp *RemoteTrustWriter) Close(ctx context.Context) error {
epoch := rtp.ep.Epoch()
rtp.log.Debug(logs.LocalAnnouncingTrusts,
zap.Uint64("epoch", epoch),
)
var prm internalclient.AnnounceLocalPrm
prm.SetClient(rtp.client)
prm.SetEpoch(epoch)
prm.SetTrusts(rtp.buf)
_, err := internalclient.AnnounceLocal(ctx, prm)
return err
}

View file

@ -1,108 +0,0 @@
package local
import (
"bytes"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
trustcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/controller"
truststorage "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/storage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
type TrustStorage struct {
Log *logger.Logger
Storage *truststorage.Storage
NmSrc netmapcore.Source
LocalKey []byte
}
func (s *TrustStorage) InitIterator(ep reputationcommon.EpochProvider) (trustcontroller.Iterator, error) {
epoch := ep.Epoch()
s.Log.Debug(logs.LocalInitializingIteratorOverTrusts,
zap.Uint64("epoch", epoch),
)
epochStorage, err := s.Storage.DataForEpoch(epoch)
if err != nil && !errors.Is(err, truststorage.ErrNoPositiveTrust) {
return nil, err
}
return &TrustIterator{
ep: ep,
storage: s,
epochStorage: epochStorage,
}, nil
}
type TrustIterator struct {
ep reputationcommon.EpochProvider
storage *TrustStorage
epochStorage *truststorage.EpochTrustValueStorage
}
func (it *TrustIterator) Iterate(h reputation.TrustHandler) error {
if it.epochStorage != nil {
err := it.epochStorage.Iterate(h)
if !errors.Is(err, truststorage.ErrNoPositiveTrust) {
return err
}
}
nm, err := it.storage.NmSrc.GetNetMapByEpoch(it.ep.Epoch())
if err != nil {
return err
}
// find out if local node is presented in netmap
localIndex := -1
nmNodes := nm.Nodes()
for i := range nmNodes {
if bytes.Equal(nmNodes[i].PublicKey(), it.storage.LocalKey) {
localIndex = i
break
}
}
ln := len(nmNodes)
if localIndex >= 0 && ln > 0 {
ln--
}
// calculate Pj http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Chapter 4.5.
p := reputation.TrustOne.Div(reputation.TrustValueFromInt(ln))
for i := range nmNodes {
if i == localIndex {
continue
}
var trusted, trusting apireputation.PeerID
trusted.SetPublicKey(nmNodes[i].PublicKey())
trusting.SetPublicKey(it.storage.LocalKey)
trust := reputation.Trust{}
trust.SetPeer(trusted)
trust.SetValue(p)
trust.SetTrustingPeer(trusting)
if err := h(trust); err != nil {
return err
}
}
return nil
}

View file

@ -1,90 +0,0 @@
package ticker
import (
"fmt"
"sync"
)
// IterationHandler is a callback of a certain block advance.
type IterationHandler func()
// IterationsTicker represents a fixed tick number block timer.
//
// It can tick the blocks and perform certain actions
// on block time intervals.
type IterationsTicker struct {
m sync.Mutex
curr uint64
period uint64
times uint64
h IterationHandler
}
// NewIterationsTicker creates a new IterationsTicker.
//
// It guaranties that a handler would be called the
// specified amount of times in the specified amount
// of blocks. After the last meaningful Tick, IterationsTicker
// becomes no-op timer.
//
// Returns an error only if times is greater than totalBlocks.
func NewIterationsTicker(totalBlocks uint64, times uint64, h IterationHandler) (*IterationsTicker, error) {
period := totalBlocks / times
if period == 0 {
return nil, fmt.Errorf("impossible to tick %d times in %d blocks",
times, totalBlocks,
)
}
var curr uint64
// try to make handler calls as rare as possible
if totalBlocks%times != 0 {
extraBlocks := (period+1)*times - totalBlocks
if period >= extraBlocks {
curr = extraBlocks + (period-extraBlocks)/2
period++
}
}
return &IterationsTicker{
curr: curr,
period: period,
times: times,
h: h,
}, nil
}
// Tick ticks one block in the IterationsTicker.
//
// Returns `false` if the timer has finished its operations
// and there will be no more handler calls.
// Calling Tick after the returned `false` is safe, no-op
// and also returns `false`.
func (ft *IterationsTicker) Tick() bool {
ft.m.Lock()
defer ft.m.Unlock()
if ft.times == 0 {
return false
}
ft.curr++
if ft.curr%ft.period == 0 {
ft.h()
ft.times--
if ft.times == 0 {
return false
}
}
return true
}

View file

@ -1,118 +0,0 @@
package ticker
import (
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestFixedTimer_Tick(t *testing.T) {
tests := [...]struct {
duration uint64
times uint64
err error
}{
{
duration: 20,
times: 4,
err: nil,
},
{
duration: 6,
times: 6,
err: nil,
},
{
duration: 10,
times: 6,
err: nil,
},
{
duration: 5,
times: 6,
err: errors.New("impossible to tick 6 times in 5 blocks"),
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("duration:%d,times:%d", test.duration, test.times), func(t *testing.T) {
counter := uint64(0)
timer, err := NewIterationsTicker(test.duration, test.times, func() {
counter++
})
if test.err != nil {
require.EqualError(t, err, test.err.Error())
return
}
require.NoError(t, err)
for i := 0; i < int(test.duration); i++ {
if !timer.Tick() {
break
}
}
require.Equal(t, false, timer.Tick())
require.Equal(t, test.times, counter)
})
}
}
func TestFixedTimer_RareCalls(t *testing.T) {
tests := [...]struct {
duration uint64
times uint64
firstCall uint64
period uint64
}{
{
duration: 11,
times: 6,
firstCall: 1,
period: 2,
},
{
duration: 11,
times: 4,
firstCall: 2,
period: 3,
},
{
duration: 20,
times: 3,
firstCall: 4,
period: 7,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("duration:%d,times:%d", test.duration, test.times), func(t *testing.T) {
var counter uint64
timer, err := NewIterationsTicker(test.duration, test.times, func() {
counter++
})
require.NoError(t, err)
checked := false
for i := 1; i <= int(test.duration); i++ {
if !timer.Tick() {
break
}
if !checked && counter == 1 {
require.Equal(t, test.firstCall, uint64(i))
checked = true
}
}
require.Equal(t, false, timer.Tick())
require.Equal(t, test.times, counter)
})
}
}

View file

@ -1,43 +0,0 @@
package main
import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/ticker"
)
type eigenTrustTickers struct {
m sync.Mutex
timers map[uint64]*ticker.IterationsTicker
}
func (e *eigenTrustTickers) addEpochTimer(epoch uint64, timer *ticker.IterationsTicker) {
e.m.Lock()
defer e.m.Unlock()
e.timers[epoch] = timer
}
func (e *eigenTrustTickers) tick() {
e.m.Lock()
defer e.m.Unlock()
for epoch, t := range e.timers {
if !t.Tick() {
delete(e.timers, epoch)
}
}
}
func tickBlockTimers(c *cfg) {
c.cfgMorph.eigenTrustTicker.tick()
}
func newEigenTrustIterTimer(c *cfg) {
c.cfgMorph.eigenTrustTicker = &eigenTrustTickers{
// it is expected to have max 2 concurrent epoch
// in normal mode work
timers: make(map[uint64]*ticker.IterationsTicker, 2),
}
}

View file

@ -18,7 +18,6 @@ import (
addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress"
statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement"
auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
@ -29,13 +28,11 @@ import (
frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -467,33 +464,6 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *
return bindMainnetProcessor(frostfsProcessor, s)
}
func (s *Server) initReputationProcessor(cfg *viper.Viper, sidechainFee fixedn.Fixed8) error {
repClient, err := repClient.NewFromMorph(s.morphClient, s.contracts.reputation, sidechainFee, repClient.TryNotary(), repClient.AsAlphabet())
if err != nil {
return err
}
// create reputation processor
reputationProcessor, err := reputation.New(&reputation.Params{
Log: s.log,
PoolSize: cfg.GetInt("workers.reputation"),
EpochState: s,
AlphabetState: s,
ReputationWrapper: repClient,
ManagerBuilder: reputationcommon.NewManagerBuilder(
reputationcommon.ManagersPrm{
NetMapSource: s.netmapClient,
},
),
NotaryDisabled: s.sideNotaryConfig.disabled,
})
if err != nil {
return err
}
return bindMorphProcessor(reputationProcessor, s)
}
func (s *Server) initGRPCServer(cfg *viper.Viper) error {
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint == "" {
@ -620,8 +590,6 @@ type serverProcessors struct {
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
result := &serverProcessors{}
fee := s.feeConfig.SideChainFee()
irf := s.createIRFetcher()
s.statusIndex = newInnerRingIndexer(
@ -681,11 +649,6 @@ func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClien
return nil, err
}
err = s.initReputationProcessor(cfg, fee)
if err != nil {
return nil, err
}
return result, nil
}

View file

@ -1,29 +0,0 @@
package reputation
import (
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
reputationEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/reputation"
"go.uber.org/zap"
)
func (rp *Processor) handlePutReputation(ev event.Event) {
put := ev.(reputationEvent.Put)
peerID := put.PeerID()
// FIXME: #1147 do not use `ToV2` method outside frostfs-api-go library
rp.log.Info(logs.Notification,
zap.String("type", "reputation put"),
zap.String("peer_id", hex.EncodeToString(peerID.PublicKey())))
// send event to the worker pool
err := rp.pool.Submit(func() { rp.processPut(&put) })
if err != nil {
// there system can be moved into controlled degradation stage
rp.log.Warn(logs.ReputationReputationWorkerPoolDrained,
zap.Int("capacity", rp.pool.Cap()))
}
}

View file

@ -1,99 +0,0 @@
package reputation
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
reputationEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/reputation"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
var errWrongManager = errors.New("got manager that is incorrect for peer")
func (rp *Processor) processPut(e *reputationEvent.Put) {
if !rp.alphabetState.IsAlphabet() {
rp.log.Info(logs.ReputationNonAlphabetModeIgnoreReputationPutNotification)
return
}
epoch := e.Epoch()
id := e.PeerID()
value := e.Value()
// check if epoch is valid
currentEpoch := rp.epochState.EpochCounter()
if epoch >= currentEpoch {
rp.log.Info(logs.ReputationIgnoreReputationValue,
zap.String("reason", "invalid epoch number"),
zap.Uint64("trust_epoch", epoch),
zap.Uint64("local_epoch", currentEpoch))
return
}
// check signature
if !value.VerifySignature() {
rp.log.Info(logs.ReputationIgnoreReputationValue,
zap.String("reason", "invalid signature"),
)
return
}
// check if manager is correct
if err := rp.checkManagers(epoch, value.Manager(), id); err != nil {
rp.log.Info(logs.ReputationIgnoreReputationValue,
zap.String("reason", "wrong manager"),
zap.String("error", err.Error()))
return
}
rp.approvePutReputation(e)
}
func (rp *Processor) checkManagers(e uint64, mng apireputation.PeerID, peer apireputation.PeerID) error {
mm, err := rp.mngBuilder.BuildManagers(e, peer)
if err != nil {
return fmt.Errorf("could not build managers: %w", err)
}
for _, m := range mm {
// FIXME: #1147 do not use `ToV2` method outside frostfs-api-go library
if bytes.Equal(mng.PublicKey(), m.PublicKey()) {
return nil
}
}
return errWrongManager
}
func (rp *Processor) approvePutReputation(e *reputationEvent.Put) {
var (
id = e.PeerID()
err error
)
if nr := e.NotaryRequest(); nr != nil {
// put event was received via Notary service
err = rp.reputationWrp.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
} else {
args := repClient.PutPrm{}
args.SetEpoch(e.Epoch())
args.SetPeerID(id)
args.SetValue(e.Value())
err = rp.reputationWrp.Put(args)
}
if err != nil {
// FIXME: #1147 do not use `ToV2` method outside frostfs-api-go library
rp.log.Warn(logs.ReputationCantSendApprovalTxForReputationValue,
zap.String("peer_id", hex.EncodeToString(id.PublicKey())),
zap.String("error", err.Error()))
}
}

View file

@ -1,156 +0,0 @@
package reputation
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
reputationEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type (
// EpochState is a callback interface for inner ring global state.
EpochState interface {
EpochCounter() uint64
}
// AlphabetState is a callback interface for inner ring global state.
AlphabetState interface {
IsAlphabet() bool
}
// Processor of events produced by reputation contract.
Processor struct {
log *logger.Logger
pool *ants.Pool
epochState EpochState
alphabetState AlphabetState
reputationWrp *repClient.Client
mngBuilder common.ManagerBuilder
notaryDisabled bool
}
// Params of the processor constructor.
Params struct {
Log *logger.Logger
PoolSize int
EpochState EpochState
AlphabetState AlphabetState
ReputationWrapper *repClient.Client
ManagerBuilder common.ManagerBuilder
NotaryDisabled bool
}
)
const (
putReputationNotification = "reputationPut"
)
// New creates reputation contract processor instance.
func New(p *Params) (*Processor, error) {
switch {
case p.Log == nil:
return nil, errors.New("ir/reputation: logger is not set")
case p.EpochState == nil:
return nil, errors.New("ir/reputation: global state is not set")
case p.AlphabetState == nil:
return nil, errors.New("ir/reputation: global state is not set")
case p.ReputationWrapper == nil:
return nil, errors.New("ir/reputation: reputation contract wrapper is not set")
case p.ManagerBuilder == nil:
return nil, errors.New("ir/reputation: manager builder is not set")
}
p.Log.Debug(logs.ReputationReputationWorkerPool, zap.Int("size", p.PoolSize))
pool, err := ants.NewPool(p.PoolSize, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("ir/reputation: can't create worker pool: %w", err)
}
return &Processor{
log: p.Log,
pool: pool,
epochState: p.EpochState,
alphabetState: p.AlphabetState,
reputationWrp: p.ReputationWrapper,
mngBuilder: p.ManagerBuilder,
notaryDisabled: p.NotaryDisabled,
}, nil
}
// ListenerNotificationParsers for the 'event.Listener' event producer.
func (rp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
if !rp.notaryDisabled {
return nil
}
var parsers []event.NotificationParserInfo
// put reputation event
put := event.NotificationParserInfo{}
put.SetType(putReputationNotification)
put.SetScriptHash(rp.reputationWrp.ContractAddress())
put.SetParser(reputationEvent.ParsePut)
parsers = append(parsers, put)
return parsers
}
// ListenerNotificationHandlers for the 'event.Listener' event producer.
func (rp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
if !rp.notaryDisabled {
return nil
}
var handlers []event.NotificationHandlerInfo
// put reputation handler
put := event.NotificationHandlerInfo{}
put.SetType(putReputationNotification)
put.SetScriptHash(rp.reputationWrp.ContractAddress())
put.SetHandler(rp.handlePutReputation)
handlers = append(handlers, put)
return handlers
}
// ListenerNotaryParsers for the 'event.Listener' notary event producer.
func (rp *Processor) ListenerNotaryParsers() []event.NotaryParserInfo {
var p event.NotaryParserInfo
p.SetMempoolType(mempoolevent.TransactionAdded)
p.SetRequestType(reputationEvent.PutNotaryEvent)
p.SetScriptHash(rp.reputationWrp.ContractAddress())
p.SetParser(reputationEvent.ParsePutNotary)
return []event.NotaryParserInfo{p}
}
// ListenerNotaryHandlers for the 'event.Listener' notary event producer.
func (rp *Processor) ListenerNotaryHandlers() []event.NotaryHandlerInfo {
var h event.NotaryHandlerInfo
h.SetMempoolType(mempoolevent.TransactionAdded)
h.SetRequestType(reputationEvent.PutNotaryEvent)
h.SetScriptHash(rp.reputationWrp.ContractAddress())
h.SetHandler(rp.handlePutReputation)
return []event.NotaryHandlerInfo{h}
}
// TimersHandlers for the 'Timers' event producer.
func (rp *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil
}

View file

@ -27,7 +27,7 @@ type (
ClientCache struct {
log *logger.Logger
cache interface {
Get(clientcore.NodeInfo) (clientcore.Client, error)
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
CloseAll()
}
key *ecdsa.PrivateKey

View file

@ -38,7 +38,7 @@ func NewSDKClientCache(opts ClientCacheOpts) *ClientCache {
}
// Get function returns existing client or creates a new one.
func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) {
func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.MultiAddressClient, error) {
netAddr := info.AddressGroup()
if c.opts.AllowExternal {
netAddr = append(netAddr, info.ExternalAddressGroup()...)

View file

@ -1,50 +0,0 @@
package grpcreputation
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
reputation2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation/grpc"
reputationrpc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/rpc"
)
// Server wraps FrostFS API v2 Reputation service server
// and provides gRPC Reputation service server interface.
type Server struct {
srv reputationrpc.Server
}
// New creates, initializes and returns Server instance.
func New(srv reputationrpc.Server) *Server {
return &Server{
srv: srv,
}
}
func (s *Server) AnnounceLocalTrust(ctx context.Context, r *reputation2.AnnounceLocalTrustRequest) (*reputation2.AnnounceLocalTrustResponse, error) {
req := new(reputation.AnnounceLocalTrustRequest)
if err := req.FromGRPCMessage(r); err != nil {
return nil, err
}
resp, err := s.srv.AnnounceLocalTrust(ctx, req)
if err != nil {
return nil, err
}
return resp.ToGRPCMessage().(*reputation2.AnnounceLocalTrustResponse), nil
}
func (s *Server) AnnounceIntermediateResult(ctx context.Context, r *reputation2.AnnounceIntermediateResultRequest) (*reputation2.AnnounceIntermediateResultResponse, error) {
req := new(reputation.AnnounceIntermediateResultRequest)
if err := req.FromGRPCMessage(r); err != nil {
return nil, err
}
resp, err := s.srv.AnnounceIntermediateResult(ctx, req)
if err != nil {
return nil, err
}
return resp.ToGRPCMessage().(*reputation2.AnnounceIntermediateResultResponse), nil
}

View file

@ -15,7 +15,7 @@ import (
)
type ClientConstructor interface {
Get(clientcore.NodeInfo) (clientcore.Client, error)
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
}
// RemoteHeader represents utility for getting

View file

@ -1,78 +0,0 @@
package common
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
type EpochProvider interface {
// Must return epoch number to select the values.
Epoch() uint64
}
// Writer describes the interface for storing reputation.Trust values.
//
// This interface is provided by both local storage
// of values and remote (wrappers over the RPC).
type Writer interface {
// Write performs a write operation of reputation.Trust value
// and returns any error encountered.
//
// All values after the Close call must be flushed to the
// physical target. Implementations can cache values before
// Close operation.
//
// Write must not be called after Close.
Write(context.Context, reputation.Trust) error
// Close exits with method-providing Writer.
//
// All cached values must be flushed before
// the Close's return.
//
// Methods must not be called after Close.
Close(context.Context) error
}
// WriterProvider is a group of methods provided
// by entity which generates keepers of
// reputation.Trust values.
type WriterProvider interface {
// InitWriter should return an initialized Writer.
//
// Initialization problems are reported via error.
// If no error was returned, then the Writer must not be nil.
//
// Implementations can have different logic for different
// contexts, so specific ones may document their own behavior.
InitWriter(EpochProvider) (Writer, error)
}
// ManagerBuilder defines an interface for providing a list
// of Managers for specific epoch. Implementation depends on trust value.
type ManagerBuilder interface {
// BuildManagers must compose list of managers. It depends on
// particular epoch and PeerID of the current route point.
BuildManagers(epoch uint64, p apireputation.PeerID) ([]ServerInfo, error)
}
// ServerInfo describes a set of
// characteristics of a point in a route.
type ServerInfo interface {
// PublicKey returns public key of the node
// from the route in a binary representation.
PublicKey() []byte
// Iterates over network addresses of the node
// in the route. Breaks iterating on true return
// of the handler.
IterateAddresses(func(string) bool)
// Returns number of server's network addresses.
NumberOfAddresses() int
// ExternalAddresses returns external addresses of a node.
ExternalAddresses() []string
}

View file

@ -1,133 +0,0 @@
package common
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apiNetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"git.frostfs.info/TrueCloudLab/hrw"
"go.uber.org/zap"
)
// managerBuilder is implementation of reputation ManagerBuilder interface.
// It sorts nodes in NetMap with HRW algorithms and
// takes the next node after the current one as the only manager.
type managerBuilder struct {
log *logger.Logger
nmSrc netmapcore.Source
opts *mngOptions
}
// ManagersPrm groups the required parameters of the managerBuilder's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type ManagersPrm struct {
NetMapSource netmapcore.Source
}
// NewManagerBuilder creates a new instance of the managerBuilder.
//
// Panics if at least one value of the parameters is invalid.
//
// The created managerBuilder does not require additional
// initialization and is completely ready for work.
func NewManagerBuilder(prm ManagersPrm, opts ...MngOption) ManagerBuilder {
switch {
case prm.NetMapSource == nil:
panic(fmt.Sprintf("invalid NetMapSource (%T):%v", prm.NetMapSource, prm.NetMapSource))
}
o := defaultMngOpts()
for i := range opts {
opts[i](o)
}
return &managerBuilder{
log: o.log,
nmSrc: prm.NetMapSource,
opts: o,
}
}
// implements Server on apiNetmap.NodeInfo.
type nodeServer apiNetmap.NodeInfo
func (x nodeServer) PublicKey() []byte {
return (apiNetmap.NodeInfo)(x).PublicKey()
}
func (x nodeServer) IterateAddresses(f func(string) bool) {
(apiNetmap.NodeInfo)(x).IterateNetworkEndpoints(f)
}
func (x nodeServer) NumberOfAddresses() int {
return (apiNetmap.NodeInfo)(x).NumberOfNetworkEndpoints()
}
func (x nodeServer) ExternalAddresses() []string {
return (apiNetmap.NodeInfo)(x).ExternalAddresses()
}
// BuildManagers sorts nodes in NetMap with HRW algorithms and
// takes the next node after the current one as the only manager.
func (mb *managerBuilder) BuildManagers(epoch uint64, p apireputation.PeerID) ([]ServerInfo, error) {
mb.log.Debug(logs.CommonStartBuildingManagers,
zap.Uint64("epoch", epoch),
zap.Stringer("peer", p),
)
nm, err := mb.nmSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, err
}
nmNodes := nm.Nodes()
// make a copy to keep order consistency of the origin netmap after sorting
nodes := make([]apiNetmap.NodeInfo, len(nmNodes))
copy(nodes, nmNodes)
hrw.SortHasherSliceByValue(nodes, epoch)
for i := range nodes {
if apireputation.ComparePeerKey(p, nodes[i].PublicKey()) {
managerIndex := i + 1
if managerIndex == len(nodes) {
managerIndex = 0
}
return []ServerInfo{nodeServer(nodes[managerIndex])}, nil
}
}
return nil, nil
}
type mngOptions struct {
log *logger.Logger
}
type MngOption func(*mngOptions)
func defaultMngOpts() *mngOptions {
return &mngOptions{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns MngOption to specify logging component.
func WithLogger(l *logger.Logger) MngOption {
return func(o *mngOptions) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,139 +0,0 @@
package router
import (
"context"
"encoding/hex"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"go.uber.org/zap"
)
// RouteInfo wraps epoch provider with additional passed
// route data. It is only used inside Router and is
// not passed in any external methods.
type RouteInfo struct {
common.EpochProvider
passedRoute []common.ServerInfo
}
// NewRouteInfo wraps the main context of value passing with its traversal route and epoch.
func NewRouteInfo(ep common.EpochProvider, passed []common.ServerInfo) *RouteInfo {
return &RouteInfo{
EpochProvider: ep,
passedRoute: passed,
}
}
type trustWriter struct {
router *Router
routeInfo *RouteInfo
routeMtx sync.RWMutex
mServers map[string]common.Writer
}
// InitWriter initializes and returns Writer that sends each value to its next route point.
//
// If ep was created by NewRouteInfo, then the traversed route is taken into account,
// and the value will be sent to its continuation. Otherwise, the route will be laid
// from scratch and the value will be sent to its primary point.
//
// After building a list of remote points of the next leg of the route, the value is sent
// sequentially to all of them. If any transmissions (even all) fail, an error will not
// be returned.
//
// Close of the composed Writer calls Close method on each internal Writer generated in
// runtime and never returns an error.
//
// Always returns nil error.
func (r *Router) InitWriter(ep common.EpochProvider) (common.Writer, error) {
var (
routeInfo *RouteInfo
ok bool
)
if routeInfo, ok = ep.(*RouteInfo); !ok {
routeInfo = &RouteInfo{
EpochProvider: ep,
passedRoute: []common.ServerInfo{r.localSrvInfo},
}
}
return &trustWriter{
router: r,
routeInfo: routeInfo,
mServers: make(map[string]common.Writer),
}, nil
}
func (w *trustWriter) Write(ctx context.Context, t reputation.Trust) error {
w.routeMtx.Lock()
defer w.routeMtx.Unlock()
route, err := w.router.routeBuilder.NextStage(w.routeInfo.Epoch(), t, w.routeInfo.passedRoute)
if err != nil {
return err
} else if len(route) == 0 {
route = []common.ServerInfo{nil}
}
for _, remoteInfo := range route {
var key string
if remoteInfo != nil {
key = hex.EncodeToString(remoteInfo.PublicKey())
}
remoteWriter, ok := w.mServers[key]
if !ok {
provider, err := w.router.remoteProvider.InitRemote(remoteInfo)
if err != nil {
w.router.log.Debug(logs.RouterCouldNotInitializeWriterProvider,
zap.String("error", err.Error()),
)
continue
}
// init writer with original context wrapped in routeContext
remoteWriter, err = provider.InitWriter(w.routeInfo.EpochProvider)
if err != nil {
w.router.log.Debug(logs.RouterCouldNotInitializeWriter,
zap.String("error", err.Error()),
)
continue
}
w.mServers[key] = remoteWriter
}
err := remoteWriter.Write(ctx, t)
if err != nil {
w.router.log.Debug(logs.RouterCouldNotWriteTheValue,
zap.String("error", err.Error()),
)
}
}
return nil
}
func (w *trustWriter) Close(ctx context.Context) error {
for key, wRemote := range w.mServers {
err := wRemote.Close(ctx)
if err != nil {
w.router.log.Debug(logs.RouterCouldNotCloseRemoteServerWriter,
zap.String("key", key),
zap.String("error", err.Error()),
)
}
}
return nil
}

View file

@ -1,28 +0,0 @@
package router
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
)
// Builder groups methods to route values in the network.
type Builder interface {
// NextStage must return next group of route points
// for passed epoch and trust values.
// Implementation must take into account already passed route points.
//
// Empty passed list means being at the starting point of the route.
//
// Must return empty list and no error if the endpoint of the route is reached.
NextStage(epoch uint64, t reputation.Trust, passed []common.ServerInfo) ([]common.ServerInfo, error)
}
// RemoteWriterProvider describes the component
// for sending values to a fixed route point.
type RemoteWriterProvider interface {
// InitRemote must return WriterProvider to the route point
// corresponding to info.
//
// Nil info matches the end of the route.
InitRemote(info common.ServerInfo) (common.WriterProvider, error)
}

View file

@ -1,28 +0,0 @@
package router
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Router.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns Option to specify logging component.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,81 +0,0 @@
package router
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
// Prm groups the required parameters of the Router's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Characteristics of the local node's server.
//
// Must not be nil.
LocalServerInfo common.ServerInfo
// Component for sending values to a fixed route point.
//
// Must not be nil.
RemoteWriterProvider RemoteWriterProvider
// Route planner.
//
// Must not be nil.
Builder Builder
}
// Router represents component responsible for routing
// local trust values over the network.
//
// For each fixed pair (node peer, epoch) there is a
// single value route on the network. Router provides the
// interface for writing values to the next point of the route.
//
// For correct operation, Router must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Router is immediately ready to work through API.
type Router struct {
log *logger.Logger
remoteProvider RemoteWriterProvider
routeBuilder Builder
localSrvInfo common.ServerInfo
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
func New(prm Prm, opts ...Option) *Router {
switch {
case prm.RemoteWriterProvider == nil:
panicOnPrmValue("RemoteWriterProvider", prm.RemoteWriterProvider)
case prm.Builder == nil:
panicOnPrmValue("Builder", prm.Builder)
case prm.LocalServerInfo == nil:
panicOnPrmValue("LocalServerInfo", prm.LocalServerInfo)
}
o := defaultOpts()
for i := range opts {
opts[i](o)
}
return &Router{
log: o.log,
remoteProvider: prm.RemoteWriterProvider,
routeBuilder: prm.Builder,
localSrvInfo: prm.LocalServerInfo,
}
}

View file

@ -1,40 +0,0 @@
package router
import (
"bytes"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
)
var errWrongRoute = errors.New("wrong route")
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
//
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
func CheckRoute(builder Builder, epoch uint64, t reputation.Trust, route []common.ServerInfo) error {
for i := 1; i < len(route); i++ {
servers, err := builder.NextStage(epoch, t, route[:i])
if err != nil {
return err
} else if len(servers) == 0 {
break
}
found := false
for j := range servers {
if bytes.Equal(servers[j].PublicKey(), route[i].PublicKey()) {
found = true
break
}
}
if !found {
return errWrongRoute
}
}
return nil
}

View file

@ -1,90 +0,0 @@
package eigentrustcalc
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
)
// Prm groups the required parameters of the Calculator's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Alpha parameter from origin EigenTrust algorithm
// http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Ch.5.1.
//
// Must be in range (0, 1).
AlphaProvider AlphaProvider
// Source of initial node trust values
//
// Must not be nil.
InitialTrustSource InitialTrustSource
DaughterTrustSource DaughterTrustIteratorProvider
IntermediateValueTarget common.WriterProvider
FinalResultTarget IntermediateWriterProvider
WorkerPool util.WorkerPool
}
// Calculator is a processor of a single iteration of EigenTrust algorithm.
//
// For correct operation, the Calculator must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Calculator is immediately ready to work through
// API of external control of calculations and data transfer.
type Calculator struct {
alpha, beta reputation.TrustValue // beta = 1 - alpha
prm Prm
opts *options
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Calculator.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Calculator does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Calculator {
switch {
case prm.AlphaProvider == nil:
panicOnPrmValue("AlphaProvider", prm.AlphaProvider)
case prm.InitialTrustSource == nil:
panicOnPrmValue("InitialTrustSource", prm.InitialTrustSource)
case prm.DaughterTrustSource == nil:
panicOnPrmValue("DaughterTrustSource", prm.DaughterTrustSource)
case prm.IntermediateValueTarget == nil:
panicOnPrmValue("IntermediateValueTarget", prm.IntermediateValueTarget)
case prm.FinalResultTarget == nil:
panicOnPrmValue("FinalResultTarget", prm.FinalResultTarget)
case prm.WorkerPool == nil:
panicOnPrmValue("WorkerPool", prm.WorkerPool)
}
o := defaultOpts()
for _, opt := range opts {
opt(o)
}
return &Calculator{
prm: prm,
opts: o,
}
}

View file

@ -1,295 +0,0 @@
package eigentrustcalc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
type CalculatePrm struct {
last bool
ei eigentrust.EpochIteration
}
func (p *CalculatePrm) SetLast(last bool) {
p.last = last
}
func (p *CalculatePrm) SetEpochIteration(ei eigentrust.EpochIteration) {
p.ei = ei
}
func (c *Calculator) Calculate(ctx context.Context, prm CalculatePrm) {
alpha, err := c.prm.AlphaProvider.EigenTrustAlpha()
if err != nil {
c.opts.log.Debug(
logs.CalculatorFailedToGetAlphaParam,
zap.Error(err),
)
return
}
c.alpha = reputation.TrustValueFromFloat64(alpha)
c.beta = reputation.TrustValueFromFloat64(1 - alpha)
epochIteration := prm.ei
iter := epochIteration.I()
log := c.opts.log.With(
zap.Uint64("epoch", epochIteration.Epoch()),
zap.Uint32("iteration", iter),
)
if iter == 0 {
c.sendInitialValues(ctx, epochIteration)
return
}
// decrement iteration number to select the values collected
// on the previous stage
epochIteration.SetI(iter - 1)
consumersIter, err := c.prm.DaughterTrustSource.InitConsumersIterator(epochIteration)
if err != nil {
log.Debug(logs.CalculatorConsumersTrustIteratorsInitFailure,
zap.String("error", err.Error()),
)
return
}
// continue with initial iteration number
epochIteration.SetI(iter)
err = consumersIter.Iterate(func(daughter apireputation.PeerID, iter TrustIterator) error {
err := c.prm.WorkerPool.Submit(func() {
c.iterateDaughter(ctx, iterDaughterPrm{
lastIter: prm.last,
ei: epochIteration,
id: daughter,
consumersIter: iter,
})
})
if err != nil {
log.Debug(logs.CalculatorWorkerPoolSubmitFailure,
zap.String("error", err.Error()),
)
}
// don't stop trying
return nil
})
if err != nil {
log.Debug(logs.CalculatorIterateDaughtersConsumersFailed,
zap.String("error", err.Error()),
)
}
}
type iterDaughterPrm struct {
lastIter bool
ei EpochIterationInfo
id apireputation.PeerID
consumersIter TrustIterator
}
func (c *Calculator) iterateDaughter(ctx context.Context, p iterDaughterPrm) {
initTrust, err := c.prm.InitialTrustSource.InitialTrust(p.id)
if err != nil {
c.opts.log.Debug(logs.CalculatorGetInitialTrustFailure,
zap.Stringer("daughter", p.id),
zap.String("error", err.Error()),
)
return
}
daughterIter, err := c.prm.DaughterTrustSource.InitDaughterIterator(p.ei, p.id)
if err != nil {
c.opts.log.Debug(logs.CalculatorDaughterTrustIteratorsInitFailure,
zap.String("error", err.Error()),
)
return
}
sum := reputation.TrustZero
err = p.consumersIter.Iterate(func(trust reputation.Trust) error {
if !p.lastIter {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
sum.Add(trust.Value())
return nil
})
if err != nil {
c.opts.log.Debug(logs.CalculatorIterateOverDaughtersTrustsFailure,
zap.String("error", err.Error()),
)
return
}
// Alpha * Pd
initTrust.Mul(c.alpha)
sum.Mul(c.beta)
sum.Add(initTrust)
var intermediateTrust eigentrust.IterationTrust
intermediateTrust.SetEpoch(p.ei.Epoch())
intermediateTrust.SetPeer(p.id)
intermediateTrust.SetI(p.ei.I())
if p.lastIter {
c.processLastIteration(p, intermediateTrust, sum)
} else {
c.processIntermediateIteration(ctx, p, daughterIter, sum)
}
}
func (c *Calculator) processLastIteration(p iterDaughterPrm, intermediateTrust eigentrust.IterationTrust, sum reputation.TrustValue) {
finalWriter, err := c.prm.FinalResultTarget.InitIntermediateWriter(p.ei)
if err != nil {
c.opts.log.Debug(logs.CalculatorInitWriterFailure,
zap.String("error", err.Error()),
)
return
}
intermediateTrust.SetValue(sum)
err = finalWriter.WriteIntermediateTrust(intermediateTrust)
if err != nil {
c.opts.log.Debug(logs.CalculatorWriteFinalResultFailure,
zap.String("error", err.Error()),
)
return
}
}
func (c *Calculator) processIntermediateIteration(ctx context.Context, p iterDaughterPrm, daughterIter TrustIterator, sum reputation.TrustValue) {
intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(p.ei)
if err != nil {
c.opts.log.Debug(logs.CalculatorInitWriterFailure,
zap.String("error", err.Error()),
)
return
}
err = daughterIter.Iterate(func(trust reputation.Trust) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
val := trust.Value()
val.Mul(sum)
trust.SetValue(val)
err := intermediateWriter.Write(ctx, trust)
if err != nil {
c.opts.log.Debug(logs.CalculatorWriteValueFailure,
zap.String("error", err.Error()),
)
}
return nil
})
if err != nil {
c.opts.log.Debug(logs.CalculatorIterateDaughterTrustsFailure,
zap.String("error", err.Error()),
)
}
err = intermediateWriter.Close(ctx)
if err != nil {
c.opts.log.Error(
"could not close writer",
zap.String("error", err.Error()),
)
}
}
func (c *Calculator) sendInitialValues(ctx context.Context, epochInfo EpochIterationInfo) {
daughterIter, err := c.prm.DaughterTrustSource.InitAllDaughtersIterator(epochInfo)
if err != nil {
c.opts.log.Debug(logs.CalculatorAllDaughtersTrustIteratorsInitFailure,
zap.String("error", err.Error()),
)
return
}
intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(epochInfo)
if err != nil {
c.opts.log.Debug(logs.CalculatorInitWriterFailure,
zap.String("error", err.Error()),
)
return
}
err = daughterIter.Iterate(func(daughter apireputation.PeerID, iterator TrustIterator) error {
return iterator.Iterate(func(trust reputation.Trust) error {
trusted := trust.Peer()
initTrust, err := c.prm.InitialTrustSource.InitialTrust(trusted)
if err != nil {
c.opts.log.Debug(logs.CalculatorGetInitialTrustFailure,
zap.Stringer("peer", trusted),
zap.String("error", err.Error()),
)
// don't stop on single failure
return nil
}
initTrust.Mul(trust.Value())
trust.SetValue(initTrust)
err = intermediateWriter.Write(ctx, trust)
if err != nil {
c.opts.log.Debug(logs.CalculatorWriteValueFailure,
zap.String("error", err.Error()),
)
// don't stop on single failure
}
return nil
})
})
if err != nil {
c.opts.log.Debug(logs.CalculatorIterateOverAllDaughtersFailure,
zap.String("error", err.Error()),
)
}
err = intermediateWriter.Close(ctx)
if err != nil {
c.opts.log.Debug(logs.CalculatorCouldNotCloseWriter,
zap.String("error", err.Error()),
)
}
}

View file

@ -1,74 +0,0 @@
package eigentrustcalc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
type EpochIterationInfo interface {
// Must return epoch number to select the values
// for global trust calculation.
Epoch() uint64
// Must return the sequence number of the iteration.
I() uint32
}
// InitialTrustSource must provide initial(non-calculated)
// trusts to current node's daughter. Realization may depends
// on daughter.
type InitialTrustSource interface {
InitialTrust(apireputation.PeerID) (reputation.TrustValue, error)
}
// TrustIterator must iterate over all retrieved(or calculated) trusts
// and call passed TrustHandler on them.
type TrustIterator interface {
Iterate(reputation.TrustHandler) error
}
type PeerTrustsHandler func(apireputation.PeerID, TrustIterator) error
// PeerTrustsIterator must iterate over all nodes(PeerIDs) and provide
// TrustIterator for iteration over node's Trusts to others peers.
type PeerTrustsIterator interface {
Iterate(PeerTrustsHandler) error
}
type DaughterTrustIteratorProvider interface {
// InitDaughterIterator must init TrustIterator
// that iterates over received local trusts from
// daughter p for epochInfo.Epoch() epoch.
InitDaughterIterator(epochInfo EpochIterationInfo, p apireputation.PeerID) (TrustIterator, error)
// InitAllDaughtersIterator must init PeerTrustsIterator
// that must iterate over all daughters of the current
// node(manager) and all trusts received from them for
// epochInfo.Epoch() epoch.
InitAllDaughtersIterator(epochInfo EpochIterationInfo) (PeerTrustsIterator, error)
// InitConsumersIterator must init PeerTrustsIterator
// that must iterate over all daughters of the current
// node(manager) and their consumers' trusts received
// from other managers for epochInfo.Epoch() epoch and
// epochInfo.I() iteration.
InitConsumersIterator(EpochIterationInfo) (PeerTrustsIterator, error)
}
// IntermediateWriter must write intermediate result to contract.
// It may depends on realization either trust is sent directly to contract
// or via redirecting to other node.
type IntermediateWriter interface {
WriteIntermediateTrust(eigentrust.IterationTrust) error
}
// IntermediateWriterProvider must provide ready-to-work
// IntermediateWriter.
type IntermediateWriterProvider interface {
InitIntermediateWriter(EpochIterationInfo) (IntermediateWriter, error)
}
// AlphaProvider must provide information about required
// alpha parameter for eigen trust algorithm.
type AlphaProvider interface {
EigenTrustAlpha() (float64, error)
}

View file

@ -1,30 +0,0 @@
package eigentrustcalc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Controller.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns option to specify logging component.
//
// Ignores nil values.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,73 +0,0 @@
package eigentrustctrl
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
"go.uber.org/zap"
)
// ContinuePrm groups the required parameters of Continue operation.
type ContinuePrm struct {
Epoch uint64
}
type iterContext struct {
eigentrust.EpochIteration
iterationNumber uint32
last bool
}
func (x iterContext) Last() bool {
return x.last
}
// Continue moves the global reputation calculator to the next iteration.
func (c *Controller) Continue(ctx context.Context, prm ContinuePrm) {
c.mtx.Lock()
{
iterCtx, ok := c.mCtx[prm.Epoch]
if !ok {
iterCtx = new(iterContext)
c.mCtx[prm.Epoch] = iterCtx
iterCtx.EpochIteration.SetEpoch(prm.Epoch)
iterations, err := c.prm.IterationsProvider.EigenTrustIterations()
if err != nil {
c.opts.log.Error(logs.ControllerCouldNotGetEigenTrustIterationNumber,
zap.Error(err),
)
} else {
iterCtx.iterationNumber = uint32(iterations)
}
}
iterCtx.last = iterCtx.I() == iterCtx.iterationNumber-1
err := c.prm.WorkerPool.Submit(func() {
c.prm.DaughtersTrustCalculator.Calculate(ctx, iterCtx)
// iteration++
iterCtx.Increment()
})
if err != nil {
c.opts.log.Debug(logs.ControllerIterationSubmitFailure,
zap.String("error", err.Error()),
)
}
if iterCtx.last {
// will only live while the application is alive.
// during normal operation of the system. Also, such information
// number as already processed, but in any case it grows up
// In this case and worker pool failure we can mark epoch
delete(c.mCtx, prm.Epoch)
}
}
c.mtx.Unlock()
}

View file

@ -1,86 +0,0 @@
package eigentrustctrl
import (
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
)
// Prm groups the required parameters of the Controller's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Component of computing iteration of EigenTrust algorithm.
//
// Must not be nil.
DaughtersTrustCalculator DaughtersTrustCalculator
// IterationsProvider provides information about numbers
// of iterations for algorithm.
IterationsProvider IterationsProvider
// Routine execution pool for single epoch iteration.
WorkerPool util.WorkerPool
}
// Controller represents EigenTrust algorithm transient controller.
//
// Controller's main goal is to separate the two main stages of
// the calculation:
// 1. reporting local values to manager nodes
// 2. calculating global trusts of child nodes
//
// Calculation stages are controlled based on external signals
// that come from the application through the Controller's API.
//
// For correct operation, the controller must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the constructor is immediately ready to work through
// API of external control of calculations and data transfer.
type Controller struct {
prm Prm
opts *options
mtx sync.Mutex
mCtx map[uint64]*iterContext
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Controller.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Controller does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Controller {
switch {
case prm.IterationsProvider == nil:
panicOnPrmValue("IterationNumber", prm.IterationsProvider)
case prm.WorkerPool == nil:
panicOnPrmValue("WorkerPool", prm.WorkerPool)
case prm.DaughtersTrustCalculator == nil:
panicOnPrmValue("DaughtersTrustCalculator", prm.DaughtersTrustCalculator)
}
o := defaultOpts()
for _, opt := range opts {
opt(o)
}
return &Controller{
prm: prm,
opts: o,
mCtx: make(map[uint64]*iterContext),
}
}

View file

@ -1,37 +0,0 @@
package eigentrustctrl
import "context"
// IterationContext is a context of the i-th
// stage of iterative EigenTrust algorithm.
type IterationContext interface {
// Must return epoch number to select the values
// for global trust calculation.
Epoch() uint64
// Must return the sequence number of the iteration.
I() uint32
// Must return true if I() is the last iteration.
Last() bool
}
// DaughtersTrustCalculator is an interface of entity
// responsible for calculating the global trust of
// daughter nodes in terms of EigenTrust algorithm.
type DaughtersTrustCalculator interface {
// Must perform the iteration step of the loop
// for computing the global trust of all daughter
// nodes and sending intermediate values
// according to EigenTrust description
// http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Ch.5.1.
//
// Execution should be interrupted if ctx.Last().
Calculate(ctx context.Context, iter IterationContext)
}
// IterationsProvider must provide information about numbers
// of iterations for algorithm.
type IterationsProvider interface {
EigenTrustIterations() (uint64, error)
}

View file

@ -1,30 +0,0 @@
package eigentrustctrl
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Controller.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns option to specify logging component.
//
// Ignores nil values.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,44 +0,0 @@
package eigentrust
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
)
type EpochIteration struct {
e uint64
i uint32
}
func (x EpochIteration) Epoch() uint64 {
return x.e
}
func (x *EpochIteration) SetEpoch(e uint64) {
x.e = e
}
func (x EpochIteration) I() uint32 {
return x.i
}
func (x *EpochIteration) SetI(i uint32) {
x.i = i
}
func (x *EpochIteration) Increment() {
x.i++
}
type IterationTrust struct {
EpochIteration
reputation.Trust
}
func NewEpochIteration(epoch uint64, iter uint32) *EpochIteration {
ei := EpochIteration{}
ei.SetI(iter)
ei.SetEpoch(epoch)
return &ei
}

View file

@ -1,59 +0,0 @@
package routes
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
// Prm groups the required parameters of the Builder's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Manager builder for current node.
//
// Must not be nil.
ManagerBuilder common.ManagerBuilder
Log *logger.Logger
}
// Builder represents component that routes node to its managers.
//
// For correct operation, Builder must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Builder is immediately ready to work through API.
type Builder struct {
managerBuilder common.ManagerBuilder
log *logger.Logger
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Builder.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Builder does not require additional
// initialization and is completely ready for work.
func New(prm Prm) *Builder {
switch {
case prm.ManagerBuilder == nil:
panicOnPrmValue("ManagerBuilder", prm.ManagerBuilder)
case prm.Log == nil:
panicOnPrmValue("Logger", prm.Log)
}
return &Builder{
managerBuilder: prm.ManagerBuilder,
log: prm.Log,
}
}

View file

@ -1,33 +0,0 @@
package routes
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"go.uber.org/zap"
)
// NextStage builds Manager list for trusted node and returns it directly.
//
// If passed route has more than one point, then endpoint of the route is reached.
func (b *Builder) NextStage(epoch uint64, t reputation.Trust, passed []common.ServerInfo) ([]common.ServerInfo, error) {
passedLen := len(passed)
b.log.Debug(logs.RoutesBuildingNextStageForTrustRoute,
zap.Uint64("epoch", epoch),
zap.Int("passed_length", passedLen),
)
if passedLen > 1 {
return nil, nil
}
route, err := b.managerBuilder.BuildManagers(epoch, t.Peer())
if err != nil {
return nil, fmt.Errorf("could not build managers for epoch: %d: %w", epoch, err)
}
return route, nil
}

View file

@ -1,201 +0,0 @@
package consumerstorage
import (
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
eigentrustcalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
// Put saves intermediate trust of the consumer to daughter peer.
func (x *Storage) Put(trust eigentrust.IterationTrust) {
var s *iterationConsumersStorage
x.mtx.Lock()
{
epoch := trust.Epoch()
s = x.mItems[epoch]
if s == nil {
s = &iterationConsumersStorage{
mItems: make(map[uint32]*ConsumersStorage, 1),
}
x.mItems[epoch] = s
}
}
x.mtx.Unlock()
s.put(trust)
}
// Consumers returns the storage of trusts of the consumers of the daughter peers
// for particular iteration of EigenTrust calculation for particular epoch.
//
// Returns false if there is no data for the epoch and iter.
func (x *Storage) Consumers(epoch uint64, iter uint32) (*ConsumersStorage, bool) {
var (
s *iterationConsumersStorage
ok bool
)
x.mtx.Lock()
{
s, ok = x.mItems[epoch]
}
x.mtx.Unlock()
if !ok {
return nil, false
}
return s.consumers(iter)
}
// maps iteration numbers of EigenTrust algorithm to repositories
// of the trusts of the consumers of the daughter peers.
type iterationConsumersStorage struct {
mtx sync.RWMutex
mItems map[uint32]*ConsumersStorage
}
func (x *iterationConsumersStorage) put(trust eigentrust.IterationTrust) {
var s *ConsumersStorage
x.mtx.Lock()
{
iter := trust.I()
s = x.mItems[iter]
if s == nil {
s = &ConsumersStorage{
mItems: make(map[string]*ConsumersTrusts, 1),
}
x.mItems[iter] = s
}
}
x.mtx.Unlock()
s.put(trust)
}
func (x *iterationConsumersStorage) consumers(iter uint32) (s *ConsumersStorage, ok bool) {
x.mtx.Lock()
{
s, ok = x.mItems[iter]
}
x.mtx.Unlock()
return
}
// ConsumersStorage represents in-memory storage of intermediate trusts
// of the peer consumers.
//
// Maps daughter peers to repositories of the trusts of their consumers.
type ConsumersStorage struct {
mtx sync.RWMutex
mItems map[string]*ConsumersTrusts
}
func (x *ConsumersStorage) put(trust eigentrust.IterationTrust) {
var s *ConsumersTrusts
x.mtx.Lock()
{
daughter := trust.Peer().EncodeToString()
s = x.mItems[daughter]
if s == nil {
s = &ConsumersTrusts{
mItems: make(map[string]reputation.Trust, 1),
}
x.mItems[daughter] = s
}
}
x.mtx.Unlock()
s.put(trust)
}
// Iterate passes IDs of the daughter peers with the trusts of their consumers to h.
//
// Returns errors from h directly.
func (x *ConsumersStorage) Iterate(h eigentrustcalc.PeerTrustsHandler) (err error) {
x.mtx.RLock()
{
for strTrusted, trusts := range x.mItems {
var trusted apireputation.PeerID
if strTrusted != "" {
err = trusted.DecodeString(strTrusted)
if err != nil {
panic(fmt.Sprintf("decode peer ID string %s: %v", strTrusted, err))
}
}
if err = h(trusted, trusts); err != nil {
break
}
}
}
x.mtx.RUnlock()
return
}
// ConsumersTrusts represents in-memory storage of the trusts
// of the consumer peers to some other peer.
type ConsumersTrusts struct {
mtx sync.RWMutex
mItems map[string]reputation.Trust
}
func (x *ConsumersTrusts) put(trust eigentrust.IterationTrust) {
x.mtx.Lock()
{
x.mItems[trust.TrustingPeer().EncodeToString()] = trust.Trust
}
x.mtx.Unlock()
}
// Iterate passes all stored trusts to h.
//
// Returns errors from h directly.
func (x *ConsumersTrusts) Iterate(h reputation.TrustHandler) (err error) {
x.mtx.RLock()
{
for _, trust := range x.mItems {
if err = h(trust); err != nil {
break
}
}
}
x.mtx.RUnlock()
return
}

View file

@ -1,40 +0,0 @@
package consumerstorage
import (
"sync"
)
// Prm groups the required parameters of the Storage's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
//
// The component is not parameterizable at the moment.
type Prm struct{}
// Storage represents in-memory storage of the trusts
// of the consumer peers.
//
// It maps epoch numbers to the repositories of intermediate
// trusts of the consumers of the daughter peers.
//
// For correct operation, Storage must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// Storage is immediately ready to work through API.
type Storage struct {
mtx sync.RWMutex
mItems map[uint64]*iterationConsumersStorage
}
// New creates a new instance of the Storage.
//
// The created Storage does not require additional
// initialization and is completely ready for work.
func New(_ Prm) *Storage {
return &Storage{
mItems: make(map[uint64]*iterationConsumersStorage),
}
}

View file

@ -1,177 +0,0 @@
package daughters
import (
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
eigentrustcalc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/calculator"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
// Put saves daughter peer's trust to its provider for the epoch.
func (x *Storage) Put(epoch uint64, trust reputation.Trust) {
var s *DaughterStorage
x.mtx.Lock()
{
s = x.mItems[epoch]
if s == nil {
s = &DaughterStorage{
mItems: make(map[string]*DaughterTrusts, 1),
}
x.mItems[epoch] = s
}
}
x.mtx.Unlock()
s.put(trust)
}
// DaughterTrusts returns daughter trusts for the epoch.
//
// Returns false if there is no data for the epoch and daughter.
func (x *Storage) DaughterTrusts(epoch uint64, daughter apireputation.PeerID) (*DaughterTrusts, bool) {
var (
s *DaughterStorage
ok bool
)
x.mtx.RLock()
{
s, ok = x.mItems[epoch]
}
x.mtx.RUnlock()
if !ok {
return nil, false
}
return s.daughterTrusts(daughter)
}
// AllDaughterTrusts returns daughter iterator for the epoch.
//
// Returns false if there is no data for the epoch and daughter.
func (x *Storage) AllDaughterTrusts(epoch uint64) (*DaughterStorage, bool) {
x.mtx.RLock()
defer x.mtx.RUnlock()
s, ok := x.mItems[epoch]
return s, ok
}
// DaughterStorage maps IDs of daughter peers to repositories of the local trusts to their providers.
type DaughterStorage struct {
mtx sync.RWMutex
mItems map[string]*DaughterTrusts
}
// Iterate passes IDs of the daughter peers with their trusts to h.
//
// Returns errors from h directly.
func (x *DaughterStorage) Iterate(h eigentrustcalc.PeerTrustsHandler) (err error) {
x.mtx.RLock()
{
for strDaughter, daughterTrusts := range x.mItems {
var daughter apireputation.PeerID
if strDaughter != "" {
err = daughter.DecodeString(strDaughter)
if err != nil {
panic(fmt.Sprintf("decode peer ID string %s: %v", strDaughter, err))
}
}
if err = h(daughter, daughterTrusts); err != nil {
break
}
}
}
x.mtx.RUnlock()
return
}
func (x *DaughterStorage) put(trust reputation.Trust) {
var dt *DaughterTrusts
x.mtx.Lock()
{
trusting := trust.TrustingPeer().EncodeToString()
dt = x.mItems[trusting]
if dt == nil {
dt = &DaughterTrusts{
mItems: make(map[string]reputation.Trust, 1),
}
x.mItems[trusting] = dt
}
}
x.mtx.Unlock()
dt.put(trust)
}
func (x *DaughterStorage) daughterTrusts(id apireputation.PeerID) (dt *DaughterTrusts, ok bool) {
x.mtx.RLock()
{
dt, ok = x.mItems[id.EncodeToString()]
}
x.mtx.RUnlock()
return
}
// DaughterTrusts represents in-memory storage of local trusts
// of the daughter peer to its providers.
//
// Maps IDs of daughter's providers to the local trusts to them.
type DaughterTrusts struct {
mtx sync.RWMutex
mItems map[string]reputation.Trust
}
func (x *DaughterTrusts) put(trust reputation.Trust) {
x.mtx.Lock()
{
x.mItems[trust.Peer().EncodeToString()] = trust
}
x.mtx.Unlock()
}
// Iterate passes all stored trusts to h.
//
// Returns errors from h directly.
func (x *DaughterTrusts) Iterate(h reputation.TrustHandler) (err error) {
x.mtx.RLock()
{
for _, trust := range x.mItems {
if err = h(trust); err != nil {
break
}
}
}
x.mtx.RUnlock()
return
}

View file

@ -1,38 +0,0 @@
package daughters
import "sync"
// Prm groups the required parameters of the Storage's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
//
// The component is not parameterizable at the moment.
type Prm struct{}
// Storage represents in-memory storage of local trust
// values of the daughter peers.
//
// It maps epoch numbers to the repositories of local trusts
// of the daughter peers.
//
// For correct operation, Storage must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// Storage is immediately ready to work through API.
type Storage struct {
mtx sync.RWMutex
mItems map[uint64]*DaughterStorage
}
// New creates a new instance of the Storage.
//
// The created Storage does not require additional
// initialization and is completely ready for work.
func New(_ Prm) *Storage {
return &Storage{
mItems: make(map[uint64]*DaughterStorage),
}
}

View file

@ -1,193 +0,0 @@
package trustcontroller
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// ReportPrm groups the required parameters of the Controller.Report method.
type ReportPrm struct {
epoch uint64
}
// SetEpoch sets epoch number to select reputation values.
func (p *ReportPrm) SetEpoch(e uint64) {
p.epoch = e
}
// Report reports local reputation values.
//
// Single Report operation overtakes all data from LocalTrustSource
// to LocalTrustTarget (Controller's parameters).
//
// Each call acquires a report context for an Epoch parameter.
// At the very end of the operation, the context is released.
func (c *Controller) Report(ctx context.Context, prm ReportPrm) {
// acquire report
rCtx, reporter := c.acquireReporter(ctx, prm.epoch)
if reporter == nil {
return
}
// report local trust values
reporter.report(rCtx)
// finally stop and free the report
c.freeReport(prm.epoch, reporter.log)
}
type reporter struct {
epoch uint64
ctrl *Controller
log *logger.Logger
ep common.EpochProvider
}
type epochProvider struct {
epoch uint64
}
func (c epochProvider) Epoch() uint64 {
return c.epoch
}
func (c *Controller) acquireReporter(ctx context.Context, epoch uint64) (context.Context, *reporter) {
started := true
c.mtx.Lock()
{
if cancel := c.mCtx[epoch]; cancel == nil {
ctx, cancel = context.WithCancel(ctx)
c.mCtx[epoch] = cancel
started = false
}
}
c.mtx.Unlock()
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("epoch", epoch),
)}
if started {
log.Debug(logs.ControllerReportIsAlreadyStarted)
return ctx, nil
}
return ctx, &reporter{
epoch: epoch,
ctrl: c,
log: log,
ep: &epochProvider{
epoch: epoch,
},
}
}
func (c *reporter) report(ctx context.Context) {
c.log.Debug(logs.ControllerStartingToReportLocalTrustValues)
// initialize iterator over locally collected values
iterator, err := c.ctrl.prm.LocalTrustSource.InitIterator(c.ep)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeIteratorOverLocalTrustValues,
zap.String("error", err.Error()),
)
return
}
// initialize target of local trust values
targetWriter, err := c.ctrl.prm.LocalTrustTarget.InitWriter(c.ep)
if err != nil {
c.log.Debug(logs.ControllerCouldNotInitializeLocalTrustTarget,
zap.String("error", err.Error()),
)
return
}
// iterate over all values and write them to the target
err = iterator.Iterate(
func(t reputation.Trust) error {
// check if context is done
if err := ctx.Err(); err != nil {
return err
}
return targetWriter.Write(ctx, t)
},
)
if err != nil && !errors.Is(err, context.Canceled) {
c.log.Debug(logs.ControllerIteratorOverLocalTrustFailed,
zap.String("error", err.Error()),
)
return
}
// finish writing
err = targetWriter.Close(ctx)
if err != nil {
c.log.Debug(logs.ControllerCouldNotFinishWritingLocalTrustValues,
zap.String("error", err.Error()),
)
return
}
c.log.Debug(logs.ControllerReportingSuccessfullyFinished)
}
func (c *Controller) freeReport(epoch uint64, log *logger.Logger) {
var stopped bool
c.mtx.Lock()
{
var cancel context.CancelFunc
cancel, stopped = c.mCtx[epoch]
if stopped {
cancel()
delete(c.mCtx, epoch)
}
}
c.mtx.Unlock()
if stopped {
log.Debug(logs.ControllerReportingSuccessfullyInterrupted)
} else {
log.Debug(logs.ControllerReportingIsNotStartedOrAlreadyInterrupted)
}
}
// StopPrm groups the required parameters of the Controller.Stop method.
type StopPrm struct {
epoch uint64
}
// SetEpoch sets epoch number the processing of the values of which must be interrupted.
func (p *StopPrm) SetEpoch(e uint64) {
p.epoch = e
}
// Stop interrupts the processing of local trust values.
//
// Releases acquired report context.
func (c *Controller) Stop(prm StopPrm) {
c.freeReport(
prm.epoch,
&logger.Logger{Logger: c.opts.log.With(zap.Uint64("epoch", prm.epoch))},
)
}

View file

@ -1,84 +0,0 @@
package trustcontroller
import (
"context"
"fmt"
"sync"
reputationrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common/router"
)
// Prm groups the required parameters of the Controller's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Iterator over the reputation values
// collected by the node locally.
//
// Must not be nil.
LocalTrustSource IteratorProvider
// Place of recording the local values of
// trust to other nodes.
//
// Must not be nil.
LocalTrustTarget *reputationrouter.Router
}
// Controller represents main handler for starting
// and interrupting the reporting local trust values.
//
// It binds the interfaces of the local value stores
// to the target storage points. Controller is abstracted
// from the internal storage device and the network location
// of the connecting components. At its core, it is a
// high-level start-stop trigger for reporting.
//
// For correct operation, the controller must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the constructor is immediately ready to work through
// API of external control of calculations and data transfer.
type Controller struct {
prm Prm
opts *options
mtx sync.Mutex
mCtx map[uint64]context.CancelFunc
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Controller.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Controller does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Controller {
switch {
case prm.LocalTrustSource == nil:
panicOnPrmValue("LocalTrustSource", prm.LocalTrustSource)
case prm.LocalTrustTarget == nil:
panicOnPrmValue("LocalTrustTarget", prm.LocalTrustTarget)
}
o := defaultOpts()
for _, opt := range opts {
opt(o)
}
return &Controller{
prm: prm,
opts: o,
mCtx: make(map[uint64]context.CancelFunc),
}
}

View file

@ -1,34 +0,0 @@
package trustcontroller
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
)
// Iterator is a group of methods provided by entity
// which can iterate over a group of reputation.Trust values.
type Iterator interface {
// Iterate must start an iterator over all trust values.
// For each value should call a handler, the error
// of which should be directly returned from the method.
//
// Internal failures of the iterator are also signaled via
// an error. After a successful call to the last value
// handler, nil should be returned.
Iterate(reputation.TrustHandler) error
}
// IteratorProvider is a group of methods provided
// by entity which generates iterators over
// reputation.Trust values.
type IteratorProvider interface {
// InitIterator should return an initialized Iterator
// that iterates over values from IteratorContext.Epoch() epoch.
//
// Initialization problems are reported via error.
// If no error was returned, then the Iterator must not be nil.
//
// Implementations can have different logic for different
// contexts, so specific ones may document their own behavior.
InitIterator(common.EpochProvider) (Iterator, error)
}

View file

@ -1,30 +0,0 @@
package trustcontroller
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option sets an optional parameter of Controller.
type Option func(*options)
type options struct {
log *logger.Logger
}
func defaultOpts() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns option to specify logging component.
//
// Ignores nil values.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
if l != nil {
o.log = l
}
}
}

View file

@ -1,32 +0,0 @@
package trustcontroller
import "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
type storageWrapper struct {
w common.Writer
i Iterator
}
func (s storageWrapper) InitIterator(common.EpochProvider) (Iterator, error) {
return s.i, nil
}
func (s storageWrapper) InitWriter(common.EpochProvider) (common.Writer, error) {
return s.w, nil
}
// SimpleIteratorProvider returns IteratorProvider that provides
// static context-independent Iterator.
func SimpleIteratorProvider(i Iterator) IteratorProvider {
return &storageWrapper{
i: i,
}
}
// SimpleWriterProvider returns WriterProvider that provides
// static context-independent Writer.
func SimpleWriterProvider(w common.Writer) common.WriterProvider {
return &storageWrapper{
w: w,
}
}

View file

@ -1,59 +0,0 @@
package routes
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
// Prm groups the required parameters of the Builder's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// Manager builder for current node.
//
// Must not be nil.
ManagerBuilder common.ManagerBuilder
Log *logger.Logger
}
// Builder represents component that routes node to its managers.
//
// For correct operation, Builder must be created using
// the constructor (New) based on the required parameters
// and optional components. After successful creation,
// the Builder is immediately ready to work through API.
type Builder struct {
managerBuilder common.ManagerBuilder
log *logger.Logger
}
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
}
// New creates a new instance of the Builder.
//
// Panics if at least one value of the parameters is invalid.
//
// The created Builder does not require additional
// initialization and is completely ready for work.
func New(prm Prm) *Builder {
switch {
case prm.ManagerBuilder == nil:
panicOnPrmValue("ManagerBuilder", prm.ManagerBuilder)
case prm.Log == nil:
panicOnPrmValue("Logger", prm.Log)
}
return &Builder{
managerBuilder: prm.ManagerBuilder,
log: prm.Log,
}
}

View file

@ -1,33 +0,0 @@
package routes
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
"go.uber.org/zap"
)
// NextStage builds Manager list for trusting node and returns it directly.
//
// If passed route has more than one point, then endpoint of the route is reached.
func (b *Builder) NextStage(epoch uint64, t reputation.Trust, passed []common.ServerInfo) ([]common.ServerInfo, error) {
passedLen := len(passed)
b.log.Debug(logs.RoutesBuildingNextStageForLocalTrustRoute,
zap.Uint64("epoch", epoch),
zap.Int("passed_length", passedLen),
)
if passedLen > 1 {
return nil, nil
}
route, err := b.managerBuilder.BuildManagers(epoch, t.TrustingPeer())
if err != nil {
return nil, fmt.Errorf("could not build managers for epoch: %d: %w", epoch, err)
}
return route, nil
}

View file

@ -1,175 +0,0 @@
package truststorage
import (
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
// UpdatePrm groups the parameters of Storage's Update operation.
type UpdatePrm struct {
sat bool
epoch uint64
peer apireputation.PeerID
}
// SetEpoch sets number of the epoch
// when the interaction happened.
func (p *UpdatePrm) SetEpoch(e uint64) {
p.epoch = e
}
// SetPeer sets identifier of the peer
// with which the local node interacted.
func (p *UpdatePrm) SetPeer(id apireputation.PeerID) {
p.peer = id
}
// SetSatisfactory sets successful completion status.
func (p *UpdatePrm) SetSatisfactory(sat bool) {
p.sat = sat
}
type trustValue struct {
sat, all int
}
// EpochTrustValueStorage represents storage of
// the trust values by particular epoch.
type EpochTrustValueStorage struct {
mtx sync.RWMutex
mItems map[string]*trustValue
}
func newTrustValueStorage() *EpochTrustValueStorage {
return &EpochTrustValueStorage{
mItems: make(map[string]*trustValue, 1),
}
}
func stringifyPeerID(id apireputation.PeerID) string {
return string(id.PublicKey())
}
func peerIDFromString(str string) (res apireputation.PeerID) {
res.SetPublicKey([]byte(str))
return
}
func (s *EpochTrustValueStorage) update(prm UpdatePrm) {
s.mtx.Lock()
{
strID := stringifyPeerID(prm.peer)
val, ok := s.mItems[strID]
if !ok {
val = new(trustValue)
s.mItems[strID] = val
}
if prm.sat {
val.sat++
}
val.all++
}
s.mtx.Unlock()
}
// Update updates the number of satisfactory transactions with peer.
func (s *Storage) Update(prm UpdatePrm) {
var trustStorage *EpochTrustValueStorage
s.mtx.Lock()
{
var (
ok bool
epoch = prm.epoch
)
trustStorage, ok = s.mItems[epoch]
if !ok {
trustStorage = newTrustValueStorage()
s.mItems[epoch] = trustStorage
}
}
s.mtx.Unlock()
trustStorage.update(prm)
}
// ErrNoPositiveTrust is returned by iterator when
// there is no positive number of successful transactions.
var ErrNoPositiveTrust = errors.New("no positive trust")
// DataForEpoch returns EpochValueStorage for epoch.
//
// If there is no data for the epoch, ErrNoPositiveTrust returns.
func (s *Storage) DataForEpoch(epoch uint64) (*EpochTrustValueStorage, error) {
s.mtx.RLock()
trustStorage, ok := s.mItems[epoch]
s.mtx.RUnlock()
if !ok {
return nil, ErrNoPositiveTrust
}
return trustStorage, nil
}
// Iterate iterates over normalized trust values and passes them to parameterized handler.
//
// Values are normalized according to http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Chapter 4.5.
// If divisor in formula is zero, ErrNoPositiveTrust returns.
func (s *EpochTrustValueStorage) Iterate(h reputation.TrustHandler) (err error) {
s.mtx.RLock()
{
var (
sum reputation.TrustValue
mVals = make(map[string]reputation.TrustValue, len(s.mItems))
)
// iterate first time to calculate normalizing divisor
for strID, val := range s.mItems {
if val.all > 0 {
num := reputation.TrustValueFromInt(val.sat)
denom := reputation.TrustValueFromInt(val.all)
v := num.Div(denom)
mVals[strID] = v
sum.Add(v)
}
}
err = ErrNoPositiveTrust
if !sum.IsZero() {
for strID, val := range mVals {
t := reputation.Trust{}
t.SetPeer(peerIDFromString(strID))
t.SetValue(val.Div(sum))
if err = h(t); err != nil {
break
}
}
}
}
s.mtx.RUnlock()
return
}

View file

@ -1,41 +0,0 @@
package truststorage
import (
"sync"
)
// Prm groups the required parameters of the Storage's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct{}
// Storage represents in-memory storage of
// local reputation values.
//
// Storage provides access to normalized local trust
// values through iterator interface.
//
// For correct operation, Storage must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// Storage is immediately ready to work through API.
type Storage struct {
prm Prm
mtx sync.RWMutex
mItems map[uint64]*EpochTrustValueStorage
}
// New creates a new instance of the Storage.
//
// The created Storage does not require additional
// initialization and is completely ready for work.
func New(prm Prm) *Storage {
return &Storage{
prm: prm,
mItems: make(map[uint64]*EpochTrustValueStorage),
}
}

View file

@ -1,50 +0,0 @@
package reputationrpc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
)
type responseService struct {
respSvc *response.Service
svc Server
}
// NewResponseService returns reputation service server instance that passes
// internal service call to response service.
func NewResponseService(cnrSvc Server, respSvc *response.Service) Server {
return &responseService{
respSvc: respSvc,
svc: cnrSvc,
}
}
func (s *responseService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceLocalTrustResponse), nil
}
func (s *responseService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
resp, err := s.respSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
}

View file

@ -1,13 +0,0 @@
package reputationrpc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
)
// Server is an interface of the FrostFS API v2 Reputation service server.
type Server interface {
AnnounceLocalTrust(context.Context, *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error)
AnnounceIntermediateResult(context.Context, *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error)
}

View file

@ -1,54 +0,0 @@
package reputationrpc
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
type signService struct {
sigSvc *util.SignService
svc Server
}
func NewSignService(key *ecdsa.PrivateKey, svc Server) Server {
return &signService{
sigSvc: util.NewUnarySignService(key),
svc: svc,
}
}
func (s *signService) AnnounceLocalTrust(ctx context.Context, req *reputation.AnnounceLocalTrustRequest) (*reputation.AnnounceLocalTrustResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceLocalTrust(ctx, req.(*reputation.AnnounceLocalTrustRequest))
},
func() util.ResponseMessage {
return new(reputation.AnnounceLocalTrustResponse)
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceLocalTrustResponse), nil
}
func (s *signService) AnnounceIntermediateResult(ctx context.Context, req *reputation.AnnounceIntermediateResultRequest) (*reputation.AnnounceIntermediateResultResponse, error) {
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
func(ctx context.Context, req any) (util.ResponseMessage, error) {
return s.svc.AnnounceIntermediateResult(ctx, req.(*reputation.AnnounceIntermediateResultRequest))
},
func() util.ResponseMessage {
return new(reputation.AnnounceIntermediateResultResponse)
},
)
if err != nil {
return nil, err
}
return resp.(*reputation.AnnounceIntermediateResultResponse), nil
}

View file

@ -1,102 +0,0 @@
package reputation
import (
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
)
// TrustValue represents the numeric value of the node's trust.
type TrustValue float64
const (
// TrustOne is a trust value equal to one.
TrustOne = TrustValue(1)
// TrustZero is a trust value equal to zero.
TrustZero = TrustValue(0)
)
// TrustValueFromFloat64 converts float64 to TrustValue.
func TrustValueFromFloat64(v float64) TrustValue {
return TrustValue(v)
}
// TrustValueFromInt converts int to TrustValue.
func TrustValueFromInt(v int) TrustValue {
return TrustValue(v)
}
func (v TrustValue) String() string {
return strconv.FormatFloat(float64(v), 'f', -1, 64)
}
// Float64 converts TrustValue to float64.
func (v TrustValue) Float64() float64 {
return float64(v)
}
// Add adds v2 to v.
func (v *TrustValue) Add(v2 TrustValue) {
*v = *v + v2
}
// Div returns the result of dividing v by v2.
func (v TrustValue) Div(v2 TrustValue) TrustValue {
return v / v2
}
// Mul multiplies v by v2.
func (v *TrustValue) Mul(v2 TrustValue) {
*v *= v2
}
// IsZero returns true if v equal to zero.
func (v TrustValue) IsZero() bool {
return v == 0
}
// Trust represents peer's trust (reputation).
type Trust struct {
trusting, peer reputation.PeerID
val TrustValue
}
// TrustHandler describes the signature of the reputation.Trust
// value handling function.
//
// Termination of processing without failures is usually signaled
// with a zero error, while a specific value may describe the reason
// for failure.
type TrustHandler func(Trust) error
// Value returns peer's trust value.
func (t Trust) Value() TrustValue {
return t.val
}
// SetValue sets peer's trust value.
func (t *Trust) SetValue(val TrustValue) {
t.val = val
}
// Peer returns trusted peer ID.
func (t Trust) Peer() reputation.PeerID {
return t.peer
}
// SetPeer sets trusted peer ID.
func (t *Trust) SetPeer(id reputation.PeerID) {
t.peer = id
}
// TrustingPeer returns trusting peer ID.
func (t Trust) TrustingPeer() reputation.PeerID {
return t.trusting
}
// SetTrustingPeer sets trusting peer ID.
func (t *Trust) SetTrustingPeer(id reputation.PeerID) {
t.trusting = id
}