diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index d5f711a5..cddedabe 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -99,7 +99,7 @@ func initApp(ctx context.Context, c *cfg) { initAndLog(c, "accounting", func(c *cfg) { initAccountingService(ctx, c) }) initAndLog(c, "container", func(c *cfg) { initContainerService(ctx, c) }) initAndLog(c, "session", initSessionService) - initAndLog(c, "reputation", initReputationService) + initAndLog(c, "reputation", func(c *cfg) { initReputationService(ctx, c) }) initAndLog(c, "notification", initNotifications) initAndLog(c, "object", initObjectService) initAndLog(c, "tree", initTreeService) diff --git a/cmd/frostfs-node/reputation.go b/cmd/frostfs-node/reputation.go index 7b43443c..a96bd066 100644 --- a/cmd/frostfs-node/reputation.go +++ b/cmd/frostfs-node/reputation.go @@ -33,7 +33,7 @@ import ( "go.uber.org/zap" ) -func initReputationService(c *cfg) { +func initReputationService(ctx context.Context, c *cfg) { wrap, err := repClient.NewFromMorph(c.cfgMorph.client, c.cfgReputation.scriptHash, 0, repClient.TryNotary()) fatalOnErr(err) @@ -73,7 +73,7 @@ func initReputationService(c *cfg) { c.cfgReputation.localTrustCtrl = createLocalTrustController(c, localTrustLogger, localKey, localTrustRouter) - addReputationReportHandler(c) + addReputationReportHandler(ctx, c) server := grpcreputation.New( reputationrpc.NewSignService( @@ -98,10 +98,10 @@ func initReputationService(c *cfg) { // initialize eigen trust block timer newEigenTrustIterTimer(c) - addEigenTrustEpochHandler(c, eigenTrustController) + addEigenTrustEpochHandler(ctx, c, eigenTrustController) } -func addReputationReportHandler(c *cfg) { +func addReputationReportHandler(ctx context.Context, c *cfg) { addNewEpochAsyncNotificationHandler( c, func(ev event.Event) { @@ -112,12 +112,12 @@ func addReputationReportHandler(c *cfg) { // report collected values from previous epoch reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1) - c.cfgReputation.localTrustCtrl.Report(reportPrm) + c.cfgReputation.localTrustCtrl.Report(ctx, reportPrm) }, ) } -func addEigenTrustEpochHandler(c *cfg, eigenTrustController *eigentrustctrl.Controller) { +func addEigenTrustEpochHandler(ctx context.Context, c *cfg, eigenTrustController *eigentrustctrl.Controller) { addNewEpochAsyncNotificationHandler( c, func(e event.Event) { @@ -138,7 +138,7 @@ func addEigenTrustEpochHandler(c *cfg, eigenTrustController *eigentrustctrl.Cont } epochTimer, err := ticker.NewIterationsTicker(duration, iterations, func() { - eigenTrustController.Continue( + eigenTrustController.Continue(ctx, eigentrustctrl.ContinuePrm{ Epoch: epoch - 1, }, @@ -286,8 +286,8 @@ func createLocalTrustController(c *cfg, localTrustLogger *logger.Logger, localKe type reputationServer struct { *cfg log *logger.Logger - localRouter reputationcommon.WriterProvider - intermediateRouter reputationcommon.WriterProvider + localRouter *reputationrouter.Router + intermediateRouter *reputationrouter.Router routeBuilder reputationrouter.Builder } @@ -297,18 +297,17 @@ func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *v2reputa body := req.GetBody() - eCtx := &common.EpochContext{ - Context: ctx, - E: body.GetEpoch(), + ep := &common.EpochProvider{ + E: body.GetEpoch(), } - w, err := s.localRouter.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute)) + w, err := s.localRouter.InitWriter(reputationrouter.NewRouteInfo(ep, passedRoute)) if err != nil { return nil, fmt.Errorf("could not initialize local trust writer: %w", err) } for _, trust := range body.GetTrusts() { - err = s.processLocalTrust(body.GetEpoch(), apiToLocalTrust(&trust, passedRoute[0].PublicKey()), passedRoute, w) + err = s.processLocalTrust(ctx, body.GetEpoch(), apiToLocalTrust(&trust, passedRoute[0].PublicKey()), passedRoute, w) if err != nil { return nil, fmt.Errorf("could not write one of local trusts: %w", err) } @@ -326,9 +325,9 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req * body := req.GetBody() - eiCtx := eigentrust.NewIterContext(ctx, body.GetEpoch(), body.GetIteration()) + ei := eigentrust.NewEpochIteration(body.GetEpoch(), body.GetIteration()) - w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteContext(eiCtx, passedRoute)) + w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteInfo(ei, passedRoute)) if err != nil { return nil, fmt.Errorf("could not initialize trust writer: %w", err) } @@ -337,7 +336,7 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req * trust := apiToLocalTrust(v2Trust.GetTrust(), v2Trust.GetTrustingPeer().GetPublicKey()) - err = w.Write(trust) + err = w.Write(ctx, trust) if err != nil { return nil, fmt.Errorf("could not write trust: %w", err) } @@ -348,14 +347,14 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req * return resp, nil } -func (s *reputationServer) processLocalTrust(epoch uint64, t reputation.Trust, +func (s *reputationServer) processLocalTrust(ctx context.Context, epoch uint64, t reputation.Trust, passedRoute []reputationcommon.ServerInfo, w reputationcommon.Writer) error { err := reputationrouter.CheckRoute(s.routeBuilder, epoch, t, passedRoute) if err != nil { return fmt.Errorf("wrong route of reputation trust value: %w", err) } - return w.Write(t) + return w.Write(ctx, t) } // apiToLocalTrust converts v2 Trust to local reputation.Trust, adding trustingPeer. diff --git a/cmd/frostfs-node/reputation/common/remote.go b/cmd/frostfs-node/reputation/common/remote.go index 0fe0a7fd..cd0a024a 100644 --- a/cmd/frostfs-node/reputation/common/remote.go +++ b/cmd/frostfs-node/reputation/common/remote.go @@ -6,7 +6,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" - reputationrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common/router" trustcontroller "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/local/controller" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" ) @@ -22,12 +21,12 @@ type clientKeyRemoteProvider interface { WithClient(client.Client) reputationcommon.WriterProvider } -// remoteTrustProvider is an implementation of reputation RemoteWriterProvider interface. +// RemoteTrustProvider is an implementation of reputation RemoteWriterProvider interface. // It caches clients, checks if it is the end of the route and checks either the current // node is a remote target or not. // // remoteTrustProvider requires to be provided with clientKeyRemoteProvider. -type remoteTrustProvider struct { +type RemoteTrustProvider struct { netmapKeys netmap.AnnouncedKeys deadEndProvider reputationcommon.WriterProvider clientCache clientCache @@ -48,7 +47,7 @@ type RemoteProviderPrm struct { Log *logger.Logger } -func NewRemoteTrustProvider(prm RemoteProviderPrm) reputationrouter.RemoteWriterProvider { +func NewRemoteTrustProvider(prm RemoteProviderPrm) *RemoteTrustProvider { switch { case prm.NetmapKeys == nil: PanicOnPrmValue("NetmapKeys", prm.NetmapKeys) @@ -62,7 +61,7 @@ func NewRemoteTrustProvider(prm RemoteProviderPrm) reputationrouter.RemoteWriter PanicOnPrmValue("Logger", prm.Log) } - return &remoteTrustProvider{ + return &RemoteTrustProvider{ netmapKeys: prm.NetmapKeys, deadEndProvider: prm.DeadEndProvider, clientCache: prm.ClientCache, @@ -71,7 +70,7 @@ func NewRemoteTrustProvider(prm RemoteProviderPrm) reputationrouter.RemoteWriter } } -func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (reputationcommon.WriterProvider, error) { +func (rtp *RemoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (reputationcommon.WriterProvider, error) { rtp.log.Debug("initializing remote writer provider") if srv == nil { diff --git a/cmd/frostfs-node/reputation/common/util.go b/cmd/frostfs-node/reputation/common/util.go index 28351d0c..443adb38 100644 --- a/cmd/frostfs-node/reputation/common/util.go +++ b/cmd/frostfs-node/reputation/common/util.go @@ -7,24 +7,21 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation" ) -// EpochContext is a std context extended with epoch data. -// nolint: containedctx -type EpochContext struct { - context.Context +type EpochProvider struct { E uint64 } -func (ctx *EpochContext) Epoch() uint64 { - return ctx.E +func (ep *EpochProvider) Epoch() uint64 { + return ep.E } type NopReputationWriter struct{} -func (NopReputationWriter) Write(reputation.Trust) error { +func (NopReputationWriter) Write(context.Context, reputation.Trust) error { return nil } -func (NopReputationWriter) Close() error { +func (NopReputationWriter) Close(context.Context) error { return nil } diff --git a/cmd/frostfs-node/reputation/intermediate/calculator.go b/cmd/frostfs-node/reputation/intermediate/calculator.go index 8bc74324..73dd1231 100644 --- a/cmd/frostfs-node/reputation/intermediate/calculator.go +++ b/cmd/frostfs-node/reputation/intermediate/calculator.go @@ -1,6 +1,7 @@ package intermediate import ( + "context" "errors" "fmt" @@ -42,15 +43,15 @@ type DaughtersTrustCalculator struct { } // Calculate converts and passes values to the wrapped calculator. -func (c *DaughtersTrustCalculator) Calculate(ctx eigentrustctrl.IterationContext) { +func (c *DaughtersTrustCalculator) Calculate(ctx context.Context, iterCtx eigentrustctrl.IterationContext) { calcPrm := eigencalc.CalculatePrm{} epochIteration := eigentrust.EpochIteration{} - epochIteration.SetEpoch(ctx.Epoch()) - epochIteration.SetI(ctx.I()) + epochIteration.SetEpoch(iterCtx.Epoch()) + epochIteration.SetI(iterCtx.I()) - calcPrm.SetLast(ctx.Last()) + calcPrm.SetLast(iterCtx.Last()) calcPrm.SetEpochIteration(epochIteration) - c.Calculator.Calculate(calcPrm) + c.Calculator.Calculate(ctx, calcPrm) } diff --git a/cmd/frostfs-node/reputation/intermediate/consumers.go b/cmd/frostfs-node/reputation/intermediate/consumers.go index fb89c439..33eab605 100644 --- a/cmd/frostfs-node/reputation/intermediate/consumers.go +++ b/cmd/frostfs-node/reputation/intermediate/consumers.go @@ -1,6 +1,8 @@ package intermediate import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation" reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust" @@ -23,41 +25,41 @@ type ConsumerStorageWriterProvider struct { // that writes passed consumer's Trust values to the Consumer storage. After writing // that, values can be used in eigenTrust algorithm's iterations. type ConsumerTrustWriter struct { - log *logger.Logger - storage *consumerstorage.Storage - eiCtx eigencalc.Context + log *logger.Logger + storage *consumerstorage.Storage + iterInfo eigencalc.EpochIterationInfo } -func (w *ConsumerTrustWriter) Write(t reputation.Trust) error { +func (w *ConsumerTrustWriter) Write(_ context.Context, t reputation.Trust) error { w.log.Debug("writing received consumer's trusts", - zap.Uint64("epoch", w.eiCtx.Epoch()), - zap.Uint32("iteration", w.eiCtx.I()), + zap.Uint64("epoch", w.iterInfo.Epoch()), + zap.Uint32("iteration", w.iterInfo.I()), zap.Stringer("trusting_peer", t.TrustingPeer()), zap.Stringer("trusted_peer", t.Peer()), ) trust := eigentrust.IterationTrust{Trust: t} - trust.SetEpoch(w.eiCtx.Epoch()) - trust.SetI(w.eiCtx.I()) + trust.SetEpoch(w.iterInfo.Epoch()) + trust.SetI(w.iterInfo.I()) w.storage.Put(trust) return nil } -func (w *ConsumerTrustWriter) Close() error { +func (w *ConsumerTrustWriter) Close(context.Context) error { return nil } -func (s *ConsumerStorageWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { - eiCtx, ok := ctx.(eigencalc.Context) +func (s *ConsumerStorageWriterProvider) InitWriter(ep reputationcommon.EpochProvider) (reputationcommon.Writer, error) { + iterInfo, ok := ep.(eigencalc.EpochIterationInfo) if !ok { panic(ErrIncorrectContextPanicMsg) } return &ConsumerTrustWriter{ - log: s.Log, - storage: s.Storage, - eiCtx: eiCtx, + log: s.Log, + storage: s.Storage, + iterInfo: iterInfo, }, nil } diff --git a/cmd/frostfs-node/reputation/intermediate/contract.go b/cmd/frostfs-node/reputation/intermediate/contract.go index 7e641132..6303b121 100644 --- a/cmd/frostfs-node/reputation/intermediate/contract.go +++ b/cmd/frostfs-node/reputation/intermediate/contract.go @@ -51,7 +51,7 @@ type FinalWriterProvider struct { } func (fwp FinalWriterProvider) InitIntermediateWriter( - _ eigentrustcalc.Context) (eigentrustcalc.IntermediateWriter, error) { + _ eigentrustcalc.EpochIterationInfo) (eigentrustcalc.IntermediateWriter, error) { return &FinalWriter{ privatKey: fwp.prm.PrivatKey, pubKey: fwp.prm.PubKey, diff --git a/cmd/frostfs-node/reputation/intermediate/daughters.go b/cmd/frostfs-node/reputation/intermediate/daughters.go index 641a0afe..d72eead4 100644 --- a/cmd/frostfs-node/reputation/intermediate/daughters.go +++ b/cmd/frostfs-node/reputation/intermediate/daughters.go @@ -1,6 +1,8 @@ package intermediate import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation" reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust/storage/daughters" @@ -21,28 +23,28 @@ type DaughterStorageWriterProvider struct { type DaughterTrustWriter struct { log *logger.Logger storage *daughters.Storage - ctx reputationcommon.Context + ep reputationcommon.EpochProvider } -func (w *DaughterTrustWriter) Write(t reputation.Trust) error { +func (w *DaughterTrustWriter) Write(_ context.Context, t reputation.Trust) error { w.log.Debug("writing received daughter's trusts", - zap.Uint64("epoch", w.ctx.Epoch()), + zap.Uint64("epoch", w.ep.Epoch()), zap.Stringer("trusting_peer", t.TrustingPeer()), zap.Stringer("trusted_peer", t.Peer()), ) - w.storage.Put(w.ctx.Epoch(), t) + w.storage.Put(w.ep.Epoch(), t) return nil } -func (w *DaughterTrustWriter) Close() error { +func (w *DaughterTrustWriter) Close(context.Context) error { return nil } -func (s *DaughterStorageWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { +func (s *DaughterStorageWriterProvider) InitWriter(ep reputationcommon.EpochProvider) (reputationcommon.Writer, error) { return &DaughterTrustWriter{ log: s.Log, storage: s.Storage, - ctx: ctx, + ep: ep, }, nil } diff --git a/cmd/frostfs-node/reputation/intermediate/remote.go b/cmd/frostfs-node/reputation/intermediate/remote.go index 224da943..b1a218b9 100644 --- a/cmd/frostfs-node/reputation/intermediate/remote.go +++ b/cmd/frostfs-node/reputation/intermediate/remote.go @@ -1,6 +1,7 @@ package intermediate import ( + "context" "crypto/ecdsa" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/common" @@ -64,32 +65,32 @@ type TrustWriterProvider struct { log *logger.Logger } -func (twp *TrustWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { - eiContext, ok := ctx.(eigentrustcalc.Context) +func (twp *TrustWriterProvider) InitWriter(ep reputationcommon.EpochProvider) (reputationcommon.Writer, error) { + iterInfo, ok := ep.(eigentrustcalc.EpochIterationInfo) if !ok { // TODO: #1164 think if this can be done without such limitation panic(ErrIncorrectContextPanicMsg) } return &RemoteTrustWriter{ - eiCtx: eiContext, - client: twp.client, - key: twp.key, - log: twp.log, + iterInfo: iterInfo, + client: twp.client, + key: twp.key, + log: twp.log, }, nil } type RemoteTrustWriter struct { - eiCtx eigentrustcalc.Context - client coreclient.Client - key *ecdsa.PrivateKey - log *logger.Logger + iterInfo eigentrustcalc.EpochIterationInfo + client coreclient.Client + key *ecdsa.PrivateKey + log *logger.Logger } // Write sends a trust value to a remote node via ReputationService.AnnounceIntermediateResult RPC. -func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error { - epoch := rtp.eiCtx.Epoch() - i := rtp.eiCtx.I() +func (rtp *RemoteTrustWriter) Write(ctx context.Context, t reputation.Trust) error { + epoch := rtp.iterInfo.Epoch() + i := rtp.iterInfo.I() rtp.log.Debug("announcing trust", zap.Uint64("epoch", epoch), @@ -108,17 +109,16 @@ func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error { var p internalclient.AnnounceIntermediatePrm - p.SetContext(rtp.eiCtx) p.SetClient(rtp.client) p.SetEpoch(epoch) p.SetIteration(i) p.SetTrust(apiPeerToPeerTrust) - _, err := internalclient.AnnounceIntermediate(p) + _, err := internalclient.AnnounceIntermediate(ctx, p) return err } -func (rtp *RemoteTrustWriter) Close() error { +func (rtp *RemoteTrustWriter) Close(context.Context) error { return nil } diff --git a/cmd/frostfs-node/reputation/intermediate/storage.go b/cmd/frostfs-node/reputation/intermediate/storage.go index 0f614d9f..db29ff92 100644 --- a/cmd/frostfs-node/reputation/intermediate/storage.go +++ b/cmd/frostfs-node/reputation/intermediate/storage.go @@ -18,7 +18,7 @@ type DaughterTrustIteratorProvider struct { // InitDaughterIterator returns an iterator over the received // local trusts for ctx.Epoch() epoch from daughter p. -func (ip *DaughterTrustIteratorProvider) InitDaughterIterator(ctx eigentrustcalc.Context, +func (ip *DaughterTrustIteratorProvider) InitDaughterIterator(ctx eigentrustcalc.EpochIterationInfo, p apireputation.PeerID) (eigentrustcalc.TrustIterator, error) { epoch := ctx.Epoch() @@ -34,7 +34,7 @@ func (ip *DaughterTrustIteratorProvider) InitDaughterIterator(ctx eigentrustcalc // daughters of the current node(manager) and all local // trusts received from them for ctx.Epoch() epoch. func (ip *DaughterTrustIteratorProvider) InitAllDaughtersIterator( - ctx eigentrustcalc.Context) (eigentrustcalc.PeerTrustsIterator, error) { + ctx eigentrustcalc.EpochIterationInfo) (eigentrustcalc.PeerTrustsIterator, error) { epoch := ctx.Epoch() iter, ok := ip.DaughterStorage.AllDaughterTrusts(epoch) @@ -49,7 +49,7 @@ func (ip *DaughterTrustIteratorProvider) InitAllDaughtersIterator( // of the current node(manager) and all their consumers' local // trusts for ctx.Epoch() epoch and ctx.I() iteration. func (ip *DaughterTrustIteratorProvider) InitConsumersIterator( - ctx eigentrustcalc.Context) (eigentrustcalc.PeerTrustsIterator, error) { + ctx eigentrustcalc.EpochIterationInfo) (eigentrustcalc.PeerTrustsIterator, error) { epoch, iter := ctx.Epoch(), ctx.I() consumerIterator, ok := ip.ConsumerStorage.Consumers(epoch, iter) diff --git a/cmd/frostfs-node/reputation/internal/client/client.go b/cmd/frostfs-node/reputation/internal/client/client.go index 22fd21d4..ff513126 100644 --- a/cmd/frostfs-node/reputation/internal/client/client.go +++ b/cmd/frostfs-node/reputation/internal/client/client.go @@ -9,11 +9,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation" ) -// nolint: containedctx type commonPrm struct { cli coreclient.Client - - ctx context.Context } // SetClient sets the base client for FrostFS API communication. @@ -23,13 +20,6 @@ func (x *commonPrm) SetClient(cli coreclient.Client) { x.cli = cli } -// SetContext sets context.Context for network communication. -// -// Required parameter. -func (x *commonPrm) SetContext(ctx context.Context) { - x.ctx = ctx -} - // AnnounceLocalPrm groups parameters of AnnounceLocal operation. type AnnounceLocalPrm struct { commonPrm @@ -55,10 +45,10 @@ type AnnounceLocalRes struct{} // Client, context and key must be set. // // Returns any error which prevented the operation from completing correctly in error return. -func AnnounceLocal(prm AnnounceLocalPrm) (res AnnounceLocalRes, err error) { +func AnnounceLocal(ctx context.Context, prm AnnounceLocalPrm) (res AnnounceLocalRes, err error) { var cliRes *client.ResAnnounceLocalTrust - cliRes, err = prm.cli.AnnounceLocalTrust(prm.ctx, prm.cliPrm) + cliRes, err = prm.cli.AnnounceLocalTrust(ctx, prm.cliPrm) if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(cliRes.Status()) @@ -98,10 +88,10 @@ type AnnounceIntermediateRes struct{} // Client, context and key must be set. // // Returns any error which prevented the operation from completing correctly in error return. -func AnnounceIntermediate(prm AnnounceIntermediatePrm) (res AnnounceIntermediateRes, err error) { +func AnnounceIntermediate(ctx context.Context, prm AnnounceIntermediatePrm) (res AnnounceIntermediateRes, err error) { var cliRes *client.ResAnnounceIntermediateTrust - cliRes, err = prm.cli.AnnounceIntermediateTrust(prm.ctx, prm.cliPrm) + cliRes, err = prm.cli.AnnounceIntermediateTrust(ctx, prm.cliPrm) if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(cliRes.Status()) diff --git a/cmd/frostfs-node/reputation/local/remote.go b/cmd/frostfs-node/reputation/local/remote.go index 2fa93ff6..3c929a9c 100644 --- a/cmd/frostfs-node/reputation/local/remote.go +++ b/cmd/frostfs-node/reputation/local/remote.go @@ -1,6 +1,7 @@ package local import ( + "context" "crypto/ecdsa" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/reputation/common" @@ -63,9 +64,9 @@ type TrustWriterProvider struct { log *logger.Logger } -func (twp *TrustWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { +func (twp *TrustWriterProvider) InitWriter(ep reputationcommon.EpochProvider) (reputationcommon.Writer, error) { return &RemoteTrustWriter{ - ctx: ctx, + ep: ep, client: twp.client, key: twp.key, log: twp.log, @@ -73,7 +74,7 @@ func (twp *TrustWriterProvider) InitWriter(ctx reputationcommon.Context) (reputa } type RemoteTrustWriter struct { - ctx reputationcommon.Context + ep reputationcommon.EpochProvider client coreclient.Client key *ecdsa.PrivateKey log *logger.Logger @@ -81,7 +82,7 @@ type RemoteTrustWriter struct { buf []reputationapi.Trust } -func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error { +func (rtp *RemoteTrustWriter) Write(_ context.Context, t reputation.Trust) error { var apiTrust reputationapi.Trust apiTrust.SetValue(t.Value().Float64()) @@ -92,8 +93,8 @@ func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error { return nil } -func (rtp *RemoteTrustWriter) Close() error { - epoch := rtp.ctx.Epoch() +func (rtp *RemoteTrustWriter) Close(ctx context.Context) error { + epoch := rtp.ep.Epoch() rtp.log.Debug("announcing trusts", zap.Uint64("epoch", epoch), @@ -101,12 +102,11 @@ func (rtp *RemoteTrustWriter) Close() error { var prm internalclient.AnnounceLocalPrm - prm.SetContext(rtp.ctx) prm.SetClient(rtp.client) prm.SetEpoch(epoch) prm.SetTrusts(rtp.buf) - _, err := internalclient.AnnounceLocal(prm) + _, err := internalclient.AnnounceLocal(ctx, prm) return err } diff --git a/cmd/frostfs-node/reputation/local/storage.go b/cmd/frostfs-node/reputation/local/storage.go index 92d10dfe..86115187 100644 --- a/cmd/frostfs-node/reputation/local/storage.go +++ b/cmd/frostfs-node/reputation/local/storage.go @@ -24,8 +24,8 @@ type TrustStorage struct { LocalKey []byte } -func (s *TrustStorage) InitIterator(ctx reputationcommon.Context) (trustcontroller.Iterator, error) { - epoch := ctx.Epoch() +func (s *TrustStorage) InitIterator(ep reputationcommon.EpochProvider) (trustcontroller.Iterator, error) { + epoch := ep.Epoch() s.Log.Debug("initializing iterator over trusts", zap.Uint64("epoch", epoch), @@ -37,14 +37,14 @@ func (s *TrustStorage) InitIterator(ctx reputationcommon.Context) (trustcontroll } return &TrustIterator{ - ctx: ctx, + ep: ep, storage: s, epochStorage: epochStorage, }, nil } type TrustIterator struct { - ctx reputationcommon.Context + ep reputationcommon.EpochProvider storage *TrustStorage @@ -59,7 +59,7 @@ func (it *TrustIterator) Iterate(h reputation.TrustHandler) error { } } - nm, err := it.storage.NmSrc.GetNetMapByEpoch(it.ctx.Epoch()) + nm, err := it.storage.NmSrc.GetNetMapByEpoch(it.ep.Epoch()) if err != nil { return err } diff --git a/pkg/services/reputation/common/deps.go b/pkg/services/reputation/common/deps.go index ebb227b5..3ea5aa88 100644 --- a/pkg/services/reputation/common/deps.go +++ b/pkg/services/reputation/common/deps.go @@ -2,17 +2,12 @@ package common import ( "context" - "io" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation" apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation" ) -// Context wraps stdlib context -// with accompanying meta values. -type Context interface { - context.Context - +type EpochProvider interface { // Must return epoch number to select the values. Epoch() uint64 } @@ -30,7 +25,7 @@ type Writer interface { // Close operation. // // Write must not be called after Close. - Write(reputation.Trust) error + Write(context.Context, reputation.Trust) error // Close exits with method-providing Writer. // @@ -38,7 +33,7 @@ type Writer interface { // the Close's return. // // Methods must not be called after Close. - io.Closer + Close(context.Context) error } // WriterProvider is a group of methods provided @@ -52,7 +47,7 @@ type WriterProvider interface { // // Implementations can have different logic for different // contexts, so specific ones may document their own behavior. - InitWriter(Context) (Writer, error) + InitWriter(EpochProvider) (Writer, error) } // ManagerBuilder defines an interface for providing a list diff --git a/pkg/services/reputation/common/router/calls.go b/pkg/services/reputation/common/router/calls.go index 75cdf56e..a177f6a2 100644 --- a/pkg/services/reputation/common/router/calls.go +++ b/pkg/services/reputation/common/router/calls.go @@ -1,6 +1,7 @@ package router import ( + "context" "encoding/hex" "sync" @@ -9,27 +10,27 @@ import ( "go.uber.org/zap" ) -// routeContext wraps context with additional passed +// RouteInfo wraps epoch provider with additional passed // route data. It is only used inside Router and is // not passed in any external methods. -type routeContext struct { - common.Context +type RouteInfo struct { + common.EpochProvider passedRoute []common.ServerInfo } -// NewRouteContext wraps the main context of value passing with its traversal route and epoch. -func NewRouteContext(ctx common.Context, passed []common.ServerInfo) common.Context { - return &routeContext{ - Context: ctx, - passedRoute: passed, +// NewRouteInfo wraps the main context of value passing with its traversal route and epoch. +func NewRouteInfo(ep common.EpochProvider, passed []common.ServerInfo) *RouteInfo { + return &RouteInfo{ + EpochProvider: ep, + passedRoute: passed, } } type trustWriter struct { router *Router - routeCtx *routeContext + routeInfo *RouteInfo routeMtx sync.RWMutex mServers map[string]common.Writer @@ -37,7 +38,7 @@ type trustWriter struct { // InitWriter initializes and returns Writer that sends each value to its next route point. // -// If ctx was created by NewRouteContext, then the traversed route is taken into account, +// If ep was created by NewRouteInfo, then the traversed route is taken into account, // and the value will be sent to its continuation. Otherwise, the route will be laid // from scratch and the value will be sent to its primary point. // @@ -49,31 +50,31 @@ type trustWriter struct { // runtime and never returns an error. // // Always returns nil error. -func (r *Router) InitWriter(ctx common.Context) (common.Writer, error) { +func (r *Router) InitWriter(ep common.EpochProvider) (common.Writer, error) { var ( - routeCtx *routeContext - ok bool + routeInfo *RouteInfo + ok bool ) - if routeCtx, ok = ctx.(*routeContext); !ok { - routeCtx = &routeContext{ - Context: ctx, - passedRoute: []common.ServerInfo{r.localSrvInfo}, + if routeInfo, ok = ep.(*RouteInfo); !ok { + routeInfo = &RouteInfo{ + EpochProvider: ep, + passedRoute: []common.ServerInfo{r.localSrvInfo}, } } return &trustWriter{ - router: r, - routeCtx: routeCtx, - mServers: make(map[string]common.Writer), + router: r, + routeInfo: routeInfo, + mServers: make(map[string]common.Writer), }, nil } -func (w *trustWriter) Write(t reputation.Trust) error { +func (w *trustWriter) Write(ctx context.Context, t reputation.Trust) error { w.routeMtx.Lock() defer w.routeMtx.Unlock() - route, err := w.router.routeBuilder.NextStage(w.routeCtx.Epoch(), t, w.routeCtx.passedRoute) + route, err := w.router.routeBuilder.NextStage(w.routeInfo.Epoch(), t, w.routeInfo.passedRoute) if err != nil { return err } else if len(route) == 0 { @@ -99,7 +100,7 @@ func (w *trustWriter) Write(t reputation.Trust) error { } // init writer with original context wrapped in routeContext - remoteWriter, err = provider.InitWriter(w.routeCtx.Context) + remoteWriter, err = provider.InitWriter(w.routeInfo.EpochProvider) if err != nil { w.router.log.Debug("could not initialize writer", zap.String("error", err.Error()), @@ -111,7 +112,7 @@ func (w *trustWriter) Write(t reputation.Trust) error { w.mServers[key] = remoteWriter } - err := remoteWriter.Write(t) + err := remoteWriter.Write(ctx, t) if err != nil { w.router.log.Debug("could not write the value", zap.String("error", err.Error()), @@ -122,9 +123,9 @@ func (w *trustWriter) Write(t reputation.Trust) error { return nil } -func (w *trustWriter) Close() error { +func (w *trustWriter) Close(ctx context.Context) error { for key, wRemote := range w.mServers { - err := wRemote.Close() + err := wRemote.Close(ctx) if err != nil { w.router.log.Debug("could not close remote server writer", zap.String("key", key), diff --git a/pkg/services/reputation/eigentrust/calculator/calls.go b/pkg/services/reputation/eigentrust/calculator/calls.go index 23e41872..2cc78978 100644 --- a/pkg/services/reputation/eigentrust/calculator/calls.go +++ b/pkg/services/reputation/eigentrust/calculator/calls.go @@ -23,7 +23,7 @@ func (p *CalculatePrm) SetEpochIteration(ei eigentrust.EpochIteration) { p.ei = ei } -func (c *Calculator) Calculate(prm CalculatePrm) { +func (c *Calculator) Calculate(ctx context.Context, prm CalculatePrm) { alpha, err := c.prm.AlphaProvider.EigenTrustAlpha() if err != nil { c.opts.log.Debug( @@ -36,28 +36,25 @@ func (c *Calculator) Calculate(prm CalculatePrm) { c.alpha = reputation.TrustValueFromFloat64(alpha) c.beta = reputation.TrustValueFromFloat64(1 - alpha) - ctx := eigentrust.IterContext{ - Context: context.Background(), - EpochIteration: prm.ei, - } + epochIteration := prm.ei - iter := ctx.I() + iter := epochIteration.I() log := c.opts.log.With( - zap.Uint64("epoch", ctx.Epoch()), + zap.Uint64("epoch", epochIteration.Epoch()), zap.Uint32("iteration", iter), ) if iter == 0 { - c.sendInitialValues(ctx) + c.sendInitialValues(ctx, epochIteration) return } // decrement iteration number to select the values collected // on the previous stage - ctx.SetI(iter - 1) + epochIteration.SetI(iter - 1) - consumersIter, err := c.prm.DaughterTrustSource.InitConsumersIterator(ctx) + consumersIter, err := c.prm.DaughterTrustSource.InitConsumersIterator(epochIteration) if err != nil { log.Debug("consumers trust iterator's init failure", zap.String("error", err.Error()), @@ -67,13 +64,13 @@ func (c *Calculator) Calculate(prm CalculatePrm) { } // continue with initial iteration number - ctx.SetI(iter) + epochIteration.SetI(iter) err = consumersIter.Iterate(func(daughter apireputation.PeerID, iter TrustIterator) error { err := c.prm.WorkerPool.Submit(func() { - c.iterateDaughter(iterDaughterPrm{ + c.iterateDaughter(ctx, iterDaughterPrm{ lastIter: prm.last, - ctx: ctx, + ei: epochIteration, id: daughter, consumersIter: iter, }) @@ -97,7 +94,7 @@ func (c *Calculator) Calculate(prm CalculatePrm) { type iterDaughterPrm struct { lastIter bool - ctx Context + ei EpochIterationInfo id apireputation.PeerID @@ -105,7 +102,7 @@ type iterDaughterPrm struct { } // nolint: funlen -func (c *Calculator) iterateDaughter(p iterDaughterPrm) { +func (c *Calculator) iterateDaughter(ctx context.Context, p iterDaughterPrm) { initTrust, err := c.prm.InitialTrustSource.InitialTrust(p.id) if err != nil { c.opts.log.Debug("get initial trust failure", @@ -116,7 +113,7 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { return } - daughterIter, err := c.prm.DaughterTrustSource.InitDaughterIterator(p.ctx, p.id) + daughterIter, err := c.prm.DaughterTrustSource.InitDaughterIterator(p.ei, p.id) if err != nil { c.opts.log.Debug("daughter trust iterator's init failure", zap.String("error", err.Error()), @@ -130,8 +127,8 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { err = p.consumersIter.Iterate(func(trust reputation.Trust) error { if !p.lastIter { select { - case <-p.ctx.Done(): - return p.ctx.Err() + case <-ctx.Done(): + return ctx.Err() default: } } @@ -155,12 +152,12 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { var intermediateTrust eigentrust.IterationTrust - intermediateTrust.SetEpoch(p.ctx.Epoch()) + intermediateTrust.SetEpoch(p.ei.Epoch()) intermediateTrust.SetPeer(p.id) - intermediateTrust.SetI(p.ctx.I()) + intermediateTrust.SetI(p.ei.I()) if p.lastIter { - finalWriter, err := c.prm.FinalResultTarget.InitIntermediateWriter(p.ctx) + finalWriter, err := c.prm.FinalResultTarget.InitIntermediateWriter(p.ei) if err != nil { c.opts.log.Debug("init writer failure", zap.String("error", err.Error()), @@ -180,7 +177,7 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { return } } else { - intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(p.ctx) + intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(p.ei) if err != nil { c.opts.log.Debug("init writer failure", zap.String("error", err.Error()), @@ -191,8 +188,8 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { err = daughterIter.Iterate(func(trust reputation.Trust) error { select { - case <-p.ctx.Done(): - return p.ctx.Err() + case <-ctx.Done(): + return ctx.Err() default: } @@ -201,7 +198,7 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { trust.SetValue(val) - err := intermediateWriter.Write(trust) + err := intermediateWriter.Write(ctx, trust) if err != nil { c.opts.log.Debug("write value failure", zap.String("error", err.Error()), @@ -216,7 +213,7 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { ) } - err = intermediateWriter.Close() + err = intermediateWriter.Close(ctx) if err != nil { c.opts.log.Error( "could not close writer", @@ -226,8 +223,8 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { } } -func (c *Calculator) sendInitialValues(ctx Context) { - daughterIter, err := c.prm.DaughterTrustSource.InitAllDaughtersIterator(ctx) +func (c *Calculator) sendInitialValues(ctx context.Context, epochInfo EpochIterationInfo) { + daughterIter, err := c.prm.DaughterTrustSource.InitAllDaughtersIterator(epochInfo) if err != nil { c.opts.log.Debug("all daughters trust iterator's init failure", zap.String("error", err.Error()), @@ -236,7 +233,7 @@ func (c *Calculator) sendInitialValues(ctx Context) { return } - intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(ctx) + intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(epochInfo) if err != nil { c.opts.log.Debug("init writer failure", zap.String("error", err.Error()), @@ -263,7 +260,7 @@ func (c *Calculator) sendInitialValues(ctx Context) { initTrust.Mul(trust.Value()) trust.SetValue(initTrust) - err = intermediateWriter.Write(trust) + err = intermediateWriter.Write(ctx, trust) if err != nil { c.opts.log.Debug("write value failure", zap.String("error", err.Error()), @@ -281,7 +278,7 @@ func (c *Calculator) sendInitialValues(ctx Context) { ) } - err = intermediateWriter.Close() + err = intermediateWriter.Close(ctx) if err != nil { c.opts.log.Debug("could not close writer", zap.String("error", err.Error()), diff --git a/pkg/services/reputation/eigentrust/calculator/deps.go b/pkg/services/reputation/eigentrust/calculator/deps.go index 66d3fd30..a22d1df7 100644 --- a/pkg/services/reputation/eigentrust/calculator/deps.go +++ b/pkg/services/reputation/eigentrust/calculator/deps.go @@ -1,16 +1,12 @@ package eigentrustcalc import ( - "context" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust" apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation" ) -type Context interface { - context.Context - +type EpochIterationInfo interface { // Must return epoch number to select the values // for global trust calculation. Epoch() uint64 @@ -43,19 +39,19 @@ type PeerTrustsIterator interface { type DaughterTrustIteratorProvider interface { // InitDaughterIterator must init TrustIterator // that iterates over received local trusts from - // daughter p for ctx.Epoch() epoch. - InitDaughterIterator(ctx Context, p apireputation.PeerID) (TrustIterator, error) + // daughter p for epochInfo.Epoch() epoch. + InitDaughterIterator(epochInfo EpochIterationInfo, p apireputation.PeerID) (TrustIterator, error) // InitAllDaughtersIterator must init PeerTrustsIterator // that must iterate over all daughters of the current // node(manager) and all trusts received from them for - // ctx.Epoch() epoch. - InitAllDaughtersIterator(ctx Context) (PeerTrustsIterator, error) + // epochInfo.Epoch() epoch. + InitAllDaughtersIterator(epochInfo EpochIterationInfo) (PeerTrustsIterator, error) // InitConsumersIterator must init PeerTrustsIterator // that must iterate over all daughters of the current // node(manager) and their consumers' trusts received - // from other managers for ctx.Epoch() epoch and - // ctx.I() iteration. - InitConsumersIterator(Context) (PeerTrustsIterator, error) + // from other managers for epochInfo.Epoch() epoch and + // epochInfo.I() iteration. + InitConsumersIterator(EpochIterationInfo) (PeerTrustsIterator, error) } // IntermediateWriter must write intermediate result to contract. @@ -68,7 +64,7 @@ type IntermediateWriter interface { // IntermediateWriterProvider must provide ready-to-work // IntermediateWriter. type IntermediateWriterProvider interface { - InitIntermediateWriter(Context) (IntermediateWriter, error) + InitIntermediateWriter(EpochIterationInfo) (IntermediateWriter, error) } // AlphaProvider must provide information about required diff --git a/pkg/services/reputation/eigentrust/controller/calls.go b/pkg/services/reputation/eigentrust/controller/calls.go index faf953ae..1753a430 100644 --- a/pkg/services/reputation/eigentrust/controller/calls.go +++ b/pkg/services/reputation/eigentrust/controller/calls.go @@ -1,6 +1,8 @@ package eigentrustctrl import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust" "go.uber.org/zap" ) @@ -22,7 +24,7 @@ func (x iterContext) Last() bool { } // Continue moves the global reputation calculator to the next iteration. -func (c *Controller) Continue(prm ContinuePrm) { +func (c *Controller) Continue(ctx context.Context, prm ContinuePrm) { c.mtx.Lock() { @@ -46,7 +48,7 @@ func (c *Controller) Continue(prm ContinuePrm) { iterCtx.last = iterCtx.I() == iterCtx.iterationNumber-1 err := c.prm.WorkerPool.Submit(func() { - c.prm.DaughtersTrustCalculator.Calculate(iterCtx) + c.prm.DaughtersTrustCalculator.Calculate(ctx, iterCtx) // iteration++ iterCtx.Increment() diff --git a/pkg/services/reputation/eigentrust/controller/deps.go b/pkg/services/reputation/eigentrust/controller/deps.go index 2aeafb61..c068f7cc 100644 --- a/pkg/services/reputation/eigentrust/controller/deps.go +++ b/pkg/services/reputation/eigentrust/controller/deps.go @@ -1,5 +1,7 @@ package eigentrustctrl +import "context" + // IterationContext is a context of the i-th // stage of iterative EigenTrust algorithm. type IterationContext interface { @@ -25,7 +27,7 @@ type DaughtersTrustCalculator interface { // http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Ch.5.1. // // Execution should be interrupted if ctx.Last(). - Calculate(ctx IterationContext) + Calculate(ctx context.Context, iter IterationContext) } // IterationsProvider must provide information about numbers diff --git a/pkg/services/reputation/eigentrust/iteration.go b/pkg/services/reputation/eigentrust/iteration.go index b06064f7..e4793f04 100644 --- a/pkg/services/reputation/eigentrust/iteration.go +++ b/pkg/services/reputation/eigentrust/iteration.go @@ -1,8 +1,6 @@ package eigentrust import ( - "context" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation" ) @@ -36,22 +34,11 @@ type IterationTrust struct { reputation.Trust } -// IterContext aggregates context and data required for -// iterations. -// nolint: containedctx -type IterContext struct { - context.Context - EpochIteration -} - -func NewIterContext(ctx context.Context, epoch uint64, iter uint32) *IterContext { +func NewEpochIteration(epoch uint64, iter uint32) *EpochIteration { ei := EpochIteration{} ei.SetI(iter) ei.SetEpoch(epoch) - return &IterContext{ - Context: ctx, - EpochIteration: ei, - } + return &ei } diff --git a/pkg/services/reputation/local/controller/calls.go b/pkg/services/reputation/local/controller/calls.go index 98815492..80fa772d 100644 --- a/pkg/services/reputation/local/controller/calls.go +++ b/pkg/services/reputation/local/controller/calls.go @@ -27,80 +27,75 @@ func (p *ReportPrm) SetEpoch(e uint64) { // // Each call acquires a report context for an Epoch parameter. // At the very end of the operation, the context is released. -func (c *Controller) Report(prm ReportPrm) { +func (c *Controller) Report(ctx context.Context, prm ReportPrm) { // acquire report - reportCtx := c.acquireReport(prm.epoch) - if reportCtx == nil { + rCtx, reporter := c.acquireReporter(ctx, prm.epoch) + if reporter == nil { return } // report local trust values - reportCtx.report() + reporter.report(rCtx) // finally stop and free the report - c.freeReport(prm.epoch, reportCtx.log) + c.freeReport(prm.epoch, reporter.log) } -type reportContext struct { +type reporter struct { epoch uint64 ctrl *Controller log *logger.Logger - ctx common.Context + ep common.EpochProvider } -// nolint: containedctx -type iteratorContext struct { - context.Context - +type epochProvider struct { epoch uint64 } -func (c iteratorContext) Epoch() uint64 { +func (c epochProvider) Epoch() uint64 { return c.epoch } -func (c *Controller) acquireReport(epoch uint64) *reportContext { - var ctx context.Context +func (c *Controller) acquireReporter(ctx context.Context, epoch uint64) (context.Context, *reporter) { + started := true c.mtx.Lock() - { if cancel := c.mCtx[epoch]; cancel == nil { - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(ctx) c.mCtx[epoch] = cancel + started = false } } - c.mtx.Unlock() log := &logger.Logger{Logger: c.opts.log.With( zap.Uint64("epoch", epoch), )} - if ctx == nil { + if started { log.Debug("report is already started") - return nil + return ctx, nil } - return &reportContext{ + return ctx, &reporter{ epoch: epoch, ctrl: c, log: log, - ctx: &iteratorContext{ - Context: ctx, - epoch: epoch, + ep: &epochProvider{ + epoch: epoch, }, } } -func (c *reportContext) report() { +func (c *reporter) report(ctx context.Context) { c.log.Debug("starting to report local trust values") // initialize iterator over locally collected values - iterator, err := c.ctrl.prm.LocalTrustSource.InitIterator(c.ctx) + iterator, err := c.ctrl.prm.LocalTrustSource.InitIterator(c.ep) if err != nil { c.log.Debug("could not initialize iterator over local trust values", zap.String("error", err.Error()), @@ -110,7 +105,7 @@ func (c *reportContext) report() { } // initialize target of local trust values - targetWriter, err := c.ctrl.prm.LocalTrustTarget.InitWriter(c.ctx) + targetWriter, err := c.ctrl.prm.LocalTrustTarget.InitWriter(c.ep) if err != nil { c.log.Debug("could not initialize local trust target", zap.String("error", err.Error()), @@ -123,11 +118,11 @@ func (c *reportContext) report() { err = iterator.Iterate( func(t reputation.Trust) error { // check if context is done - if err := c.ctx.Err(); err != nil { + if err := ctx.Err(); err != nil { return err } - return targetWriter.Write(t) + return targetWriter.Write(ctx, t) }, ) if err != nil && !errors.Is(err, context.Canceled) { @@ -139,7 +134,7 @@ func (c *reportContext) report() { } // finish writing - err = targetWriter.Close() + err = targetWriter.Close(ctx) if err != nil { c.log.Debug("could not finish writing local trust values", zap.String("error", err.Error()), diff --git a/pkg/services/reputation/local/controller/controller.go b/pkg/services/reputation/local/controller/controller.go index 7bf56be8..373df36d 100644 --- a/pkg/services/reputation/local/controller/controller.go +++ b/pkg/services/reputation/local/controller/controller.go @@ -5,7 +5,7 @@ import ( "fmt" "sync" - reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" + reputationrouter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common/router" ) // Prm groups the required parameters of the Controller's constructor. @@ -24,7 +24,7 @@ type Prm struct { // trust to other nodes. // // Must not be nil. - LocalTrustTarget reputationcommon.WriterProvider + LocalTrustTarget *reputationrouter.Router } // Controller represents main handler for starting diff --git a/pkg/services/reputation/local/controller/deps.go b/pkg/services/reputation/local/controller/deps.go index 3ab72eb5..6f4a29c9 100644 --- a/pkg/services/reputation/local/controller/deps.go +++ b/pkg/services/reputation/local/controller/deps.go @@ -30,5 +30,5 @@ type IteratorProvider interface { // // Implementations can have different logic for different // contexts, so specific ones may document their own behavior. - InitIterator(common.Context) (Iterator, error) + InitIterator(common.EpochProvider) (Iterator, error) } diff --git a/pkg/services/reputation/local/controller/util.go b/pkg/services/reputation/local/controller/util.go index 97b9e3a6..12255049 100644 --- a/pkg/services/reputation/local/controller/util.go +++ b/pkg/services/reputation/local/controller/util.go @@ -7,11 +7,11 @@ type storageWrapper struct { i Iterator } -func (s storageWrapper) InitIterator(common.Context) (Iterator, error) { +func (s storageWrapper) InitIterator(common.EpochProvider) (Iterator, error) { return s.i, nil } -func (s storageWrapper) InitWriter(common.Context) (common.Writer, error) { +func (s storageWrapper) InitWriter(common.EpochProvider) (common.Writer, error) { return s.w, nil }