From 8879c6ea4aa5431bdfb25e6fca7c15b9e339c6b1 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 25 Apr 2023 09:16:14 +0300 Subject: [PATCH] [#248] innerring: Remove audit and settlement code Signed-off-by: Evgenii Stratonikov --- pkg/innerring/blocktimer.go | 39 -- pkg/innerring/contracts.go | 2 - pkg/innerring/initialization.go | 147 +------- pkg/innerring/innerring.go | 12 - pkg/innerring/internal/client/client.go | 339 ------------------ pkg/innerring/internal/client/doc.go | 12 - pkg/innerring/internal/client/prm.go | 18 - pkg/innerring/processors/audit/events.go | 19 - pkg/innerring/processors/audit/handlers.go | 22 -- pkg/innerring/processors/audit/process.go | 217 ----------- pkg/innerring/processors/audit/processor.go | 143 -------- pkg/innerring/processors/audit/scheduler.go | 65 ---- .../processors/audit/scheduler_test.go | 106 ------ .../processors/netmap/process_epoch.go | 4 - pkg/innerring/processors/netmap/processor.go | 19 +- .../processors/settlement/audit/calculate.go | 336 ----------------- .../processors/settlement/audit/calculator.go | 48 --- .../processors/settlement/audit/prm.go | 49 --- .../processors/settlement/basic/collect.go | 114 ------ .../processors/settlement/basic/context.go | 80 ----- .../processors/settlement/basic/distribute.go | 59 --- .../settlement/basic/distribute_test.go | 54 --- .../processors/settlement/basic/util.go | 44 --- pkg/innerring/processors/settlement/calls.go | 133 ------- .../processors/settlement/common/details.go | 33 -- .../settlement/common/details_test.go | 28 -- .../processors/settlement/common/types.go | 54 --- .../processors/settlement/common/util.go | 74 ---- pkg/innerring/processors/settlement/deps.go | 17 - pkg/innerring/processors/settlement/events.go | 46 --- .../processors/settlement/handlers.go | 22 -- pkg/innerring/processors/settlement/opts.go | 31 -- .../processors/settlement/processor.go | 79 ---- pkg/innerring/rpc.go | 236 ------------ pkg/innerring/settlement.go | 301 ---------------- pkg/innerring/state.go | 14 - 36 files changed, 8 insertions(+), 3008 deletions(-) delete mode 100644 pkg/innerring/internal/client/client.go delete mode 100644 pkg/innerring/internal/client/doc.go delete mode 100644 pkg/innerring/internal/client/prm.go delete mode 100644 pkg/innerring/processors/audit/events.go delete mode 100644 pkg/innerring/processors/audit/handlers.go delete mode 100644 pkg/innerring/processors/audit/process.go delete mode 100644 pkg/innerring/processors/audit/processor.go delete mode 100644 pkg/innerring/processors/audit/scheduler.go delete mode 100644 pkg/innerring/processors/audit/scheduler_test.go delete mode 100644 pkg/innerring/processors/settlement/audit/calculate.go delete mode 100644 pkg/innerring/processors/settlement/audit/calculator.go delete mode 100644 pkg/innerring/processors/settlement/audit/prm.go delete mode 100644 pkg/innerring/processors/settlement/basic/collect.go delete mode 100644 pkg/innerring/processors/settlement/basic/context.go delete mode 100644 pkg/innerring/processors/settlement/basic/distribute.go delete mode 100644 pkg/innerring/processors/settlement/basic/distribute_test.go delete mode 100644 pkg/innerring/processors/settlement/basic/util.go delete mode 100644 pkg/innerring/processors/settlement/calls.go delete mode 100644 pkg/innerring/processors/settlement/common/details.go delete mode 100644 pkg/innerring/processors/settlement/common/details_test.go delete mode 100644 pkg/innerring/processors/settlement/common/types.go delete mode 100644 pkg/innerring/processors/settlement/common/util.go delete mode 100644 pkg/innerring/processors/settlement/deps.go delete mode 100644 pkg/innerring/processors/settlement/events.go delete mode 100644 pkg/innerring/processors/settlement/handlers.go delete mode 100644 pkg/innerring/processors/settlement/opts.go delete mode 100644 pkg/innerring/processors/settlement/processor.go delete mode 100644 pkg/innerring/rpc.go delete mode 100644 pkg/innerring/settlement.go diff --git a/pkg/innerring/blocktimer.go b/pkg/innerring/blocktimer.go index 94e26209..6a4dee80 100644 --- a/pkg/innerring/blocktimer.go +++ b/pkg/innerring/blocktimer.go @@ -5,10 +5,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/timer" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/nspcc-dev/neo-go/pkg/util" @@ -25,12 +23,6 @@ type ( IsAlphabet() bool } - subEpochEventHandler struct { - handler event.Handler // handle to execute - durationMul uint32 // X: X/Y of epoch in blocks - durationDiv uint32 // Y: X/Y of epoch in blocks - } - newEpochHandler func() epochTimerArgs struct { @@ -45,9 +37,6 @@ type ( stopEstimationDMul uint32 // X: X/Y of epoch in blocks stopEstimationDDiv uint32 // Y: X/Y of epoch in blocks - - collectBasicIncome subEpochEventHandler - distributeBasicIncome subEpochEventHandler } emitTimerArgs struct { @@ -119,34 +108,6 @@ func newEpochTimer(args *epochTimerArgs) *timer.BlockTimer { } }) - epochTimer.OnDelta( - args.collectBasicIncome.durationMul, - args.collectBasicIncome.durationDiv, - func() { - epochN := args.epoch.EpochCounter() - if epochN == 0 { // estimates are invalid in genesis epoch - return - } - - args.collectBasicIncome.handler( - settlement.NewBasicIncomeCollectEvent(epochN - 1), - ) - }) - - epochTimer.OnDelta( - args.distributeBasicIncome.durationMul, - args.distributeBasicIncome.durationDiv, - func() { - epochN := args.epoch.EpochCounter() - if epochN == 0 { // estimates are invalid in genesis epoch - return - } - - args.distributeBasicIncome.handler( - settlement.NewBasicIncomeDistributeEvent(epochN - 1), - ) - }) - return epochTimer } diff --git a/pkg/innerring/contracts.go b/pkg/innerring/contracts.go index def55f22..85a4bf94 100644 --- a/pkg/innerring/contracts.go +++ b/pkg/innerring/contracts.go @@ -15,7 +15,6 @@ type contracts struct { netmap util.Uint160 // in morph balance util.Uint160 // in morph container util.Uint160 // in morph - audit util.Uint160 // in morph proxy util.Uint160 // in morph processing util.Uint160 // in mainnet frostfsID util.Uint160 // in morph @@ -58,7 +57,6 @@ func parseContracts(cfg *viper.Viper, morph *client.Client, withoutMainNet, with {"contracts.netmap", client.NNSNetmapContractName, &result.netmap}, {"contracts.balance", client.NNSBalanceContractName, &result.balance}, {"contracts.container", client.NNSContainerContractName, &result.container}, - {"contracts.audit", client.NNSAuditContractName, &result.audit}, {"contracts.frostfsid", client.NNSFrostFSIDContractName, &result.frostfsID}, } diff --git a/pkg/innerring/initialization.go b/pkg/innerring/initialization.go index 38bde141..cb9b1064 100644 --- a/pkg/innerring/initialization.go +++ b/pkg/innerring/initialization.go @@ -8,7 +8,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance" cont "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs" @@ -17,25 +16,19 @@ import ( nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation" addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress" statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement" - auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" - auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit" balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" - audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager" control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir" controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server" - util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" - "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "go.uber.org/zap" "google.golang.org/grpc" @@ -43,9 +36,7 @@ import ( func (s *Server) initNetmapProcessor(cfg *viper.Viper, cnrClient *container.Client, - alphaSync event.Handler, - auditProcessor *audit.Processor, - settlementProcessor *settlement.Processor) error { + alphaSync event.Handler) error { locodeValidator, err := s.newLocodeValidator(cfg) if err != nil { return err @@ -70,15 +61,9 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper, CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), ContainerWrapper: cnrClient, - HandleAudit: s.onlyActiveEventHandler( - auditProcessor.StartAuditHandler(), - ), NotaryDepositHandler: s.onlyAlphabetEventHandler( s.notaryHandler, ), - AuditSettlementsHandler: s.onlyAlphabetEventHandler( - settlementProcessor.HandleAuditEvent, - ), AlphabetSyncHandler: s.onlyAlphabetEventHandler( alphaSync, ), @@ -171,93 +156,6 @@ func (s *Server) initNotaryConfig() { ) } -func (s *Server) createAuditProcessor(cfg *viper.Viper, clientCache *ClientCache, cnrClient *container.Client) (*audit.Processor, error) { - auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size")) - if err != nil { - return nil, err - } - - pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") - porPoolSize := cfg.GetInt("audit.por.pool_size") - - // create audit processor dependencies - auditTaskManager := audittask.New( - audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), - audittask.WithWorkerPool(auditPool), - audittask.WithLogger(s.log), - audittask.WithContainerCommunicator(clientCache), - audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")), - audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) { - return ants.NewPool(pdpPoolSize) - }), - audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) { - return ants.NewPool(porPoolSize) - }), - ) - - s.workers = append(s.workers, auditTaskManager.Listen) - - // create audit processor - return audit.New(&audit.Params{ - Log: s.log, - NetmapClient: s.netmapClient, - ContainerClient: cnrClient, - IRList: s, - EpochSource: s, - SGSource: clientCache, - Key: &s.key.PrivateKey, - RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), - TaskManager: auditTaskManager, - Reporter: s, - }) -} - -func (s *Server) createSettlementProcessor(clientCache *ClientCache, cnrClient *container.Client) *settlement.Processor { - // create settlement processor dependencies - settlementDeps := settlementDeps{ - log: s.log, - cnrSrc: container.AsContainerSource(cnrClient), - auditClient: s.auditClient, - nmClient: s.netmapClient, - clientCache: clientCache, - balanceClient: s.balanceClient, - } - - settlementDeps.settlementCtx = auditSettlementContext - auditCalcDeps := &auditSettlementDeps{ - settlementDeps: settlementDeps, - } - - settlementDeps.settlementCtx = basicIncomeSettlementContext - basicSettlementDeps := &basicIncomeSettlementDeps{ - settlementDeps: settlementDeps, - cnrClient: cnrClient, - } - - auditSettlementCalc := auditSettlement.NewCalculator( - &auditSettlement.CalculatorPrm{ - ResultStorage: auditCalcDeps, - ContainerStorage: auditCalcDeps, - PlacementCalculator: auditCalcDeps, - SGStorage: auditCalcDeps, - AccountStorage: auditCalcDeps, - Exchanger: auditCalcDeps, - AuditFeeFetcher: s.netmapClient, - }, - auditSettlement.WithLogger(s.log), - ) - - // create settlement processor - return settlement.New( - settlement.Prm{ - AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), - BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps}, - State: s, - }, - settlement.WithLogger(s.log), - ) -} - func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) { var alphaSync event.Handler @@ -316,16 +214,6 @@ func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morp epoch: s, stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"), stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"), - collectBasicIncome: subEpochEventHandler{ - handler: processors.SettlementProcessor.HandleIncomeCollectionEvent, - durationMul: cfg.GetUint32("timers.collect_basic_income.mul"), - durationDiv: cfg.GetUint32("timers.collect_basic_income.div"), - }, - distributeBasicIncome: subEpochEventHandler{ - handler: processors.SettlementProcessor.HandleIncomeDistributionEvent, - durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"), - durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"), - }, }) s.addBlockTimer(s.epochTimer) @@ -493,12 +381,6 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) { var err error fee := s.feeConfig.SideChainFee() - // do not use TryNotary() in audit wrapper - // audit operations do not require multisignatures - s.auditClient, err = auditClient.NewFromMorph(s.morphClient, s.contracts.audit, fee) - if err != nil { - return nil, err - } // form morph container client's options morphCnrOpts := make([]container.Option, 0, 3) @@ -545,8 +427,7 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) { } type serverProcessors struct { - AlphabetProcessor *alphabet.Processor - SettlementProcessor *settlement.Processor + AlphabetProcessor *alphabet.Processor } func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) { @@ -561,32 +442,12 @@ func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClien cfg.GetDuration("indexer.cache_timeout"), ) - clientCache := newClientCache(&clientCacheParams{ - Log: s.log, - Key: &s.key.PrivateKey, - SGTimeout: cfg.GetDuration("audit.timeout.get"), - HeadTimeout: cfg.GetDuration("audit.timeout.head"), - RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"), - AllowExternal: cfg.GetBool("audit.allow_external"), - }) - - s.registerNoErrCloser(clientCache.cache.CloseAll) - - // create audit processor - auditProcessor, err := s.createAuditProcessor(cfg, clientCache, morphClients.CnrClient) + alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf) if err != nil { return nil, err } - result.SettlementProcessor = s.createSettlementProcessor(clientCache, morphClients.CnrClient) - - var alphaSync event.Handler - alphaSync, err = s.createAlphaSync(cfg, morphClients.FrostFSClient, irf) - if err != nil { - return nil, err - } - - err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync, auditProcessor, result.SettlementProcessor) + err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync) if err != nil { return nil, err } diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 2cb29b90..deb546f0 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -13,7 +13,6 @@ import ( timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" - auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit" balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" @@ -52,7 +51,6 @@ type ( epochDuration atomic.Uint64 statusIndex *innerRingIndexer precision precision.Fixed8Converter - auditClient *auditClient.Client healthStatus atomic.Value balanceClient *balanceClient.Client netmapClient *nmClient.Client @@ -572,16 +570,6 @@ func (s *Server) nextEpochBlockDelta() (uint32, error) { return delta - blockHeight, nil } -// onlyActiveHandler wrapper around event handler that executes it -// only if inner ring node state is active. -func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler { - return func(ev event.Event) { - if s.IsActive() { - f(ev) - } - } -} - // onlyAlphabet wrapper around event handler that executes it // only if inner ring node is alphabet node. func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler { diff --git a/pkg/innerring/internal/client/client.go b/pkg/innerring/internal/client/client.go deleted file mode 100644 index 3e95e976..00000000 --- a/pkg/innerring/internal/client/client.go +++ /dev/null @@ -1,339 +0,0 @@ -package frostfsapiclient - -import ( - "context" - "crypto/ecdsa" - "errors" - "fmt" - "io" - - clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/storagegroup" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" -) - -// Client represents FrostFS API client cut down to the needs of a purely IR application. -type Client struct { - key *ecdsa.PrivateKey - - c clientcore.Client -} - -// WrapBasicClient wraps a client.Client instance to use it for FrostFS API RPC. -func (x *Client) WrapBasicClient(c clientcore.Client) { - x.c = c -} - -// SetPrivateKey sets a private key to sign RPC requests. -func (x *Client) SetPrivateKey(key *ecdsa.PrivateKey) { - x.key = key -} - -// SearchSGPrm groups parameters of SearchSG operation. -type SearchSGPrm struct { - cnrID cid.ID -} - -// SetContainerID sets the ID of the container to search for storage groups. -func (x *SearchSGPrm) SetContainerID(id cid.ID) { - x.cnrID = id -} - -// SearchSGRes groups the resulting values of SearchSG operation. -type SearchSGRes struct { - cliRes []oid.ID -} - -// IDList returns a list of IDs of storage groups in the container. -func (x SearchSGRes) IDList() []oid.ID { - return x.cliRes -} - -var sgFilter = storagegroup.SearchQuery() - -// SearchSG lists objects of storage group type in the container. -// -// Returns any error which prevented the operation from completing correctly in error return. -func (x Client) SearchSG(ctx context.Context, prm SearchSGPrm) (*SearchSGRes, error) { - var cliPrm client.PrmObjectSearch - cliPrm.InContainer(prm.cnrID) - cliPrm.SetFilters(sgFilter) - cliPrm.UseKey(*x.key) - - rdr, err := x.c.ObjectSearchInit(ctx, cliPrm) - if err != nil { - return nil, fmt.Errorf("init object search: %w", err) - } - - buf := make([]oid.ID, 10) - var list []oid.ID - var n int - var ok bool - - for { - n, ok = rdr.Read(buf) - for i := 0; i < n; i++ { - list = append(list, buf[i]) - } - if !ok { - break - } - } - - res, err := rdr.Close() - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.Status()) - } - - if err != nil { - return nil, fmt.Errorf("read object list: %w", err) - } - - return &SearchSGRes{ - cliRes: list, - }, nil -} - -// GetObjectPrm groups parameters of GetObject operation. -type GetObjectPrm struct { - getObjectPrm -} - -// GetObjectRes groups the resulting values of GetObject operation. -type GetObjectRes struct { - obj *object.Object -} - -// Object returns the received object. -func (x GetObjectRes) Object() *object.Object { - return x.obj -} - -// GetObject reads the object by address. -// -// Returns any error which prevented the operation from completing correctly in error return. -func (x Client) GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) { - var cliPrm client.PrmObjectGet - cliPrm.FromContainer(prm.objAddr.Container()) - cliPrm.ByID(prm.objAddr.Object()) - cliPrm.UseKey(*x.key) - - rdr, err := x.c.ObjectGetInit(ctx, cliPrm) - if err != nil { - return nil, fmt.Errorf("init object search: %w", err) - } - - var obj object.Object - - if !rdr.ReadHeader(&obj) { - res, err := rdr.Close() - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(res.Status()) - } - - return nil, fmt.Errorf("read object header: %w", err) - } - - buf := make([]byte, obj.PayloadSize()) - - _, err = rdr.Read(buf) - if err != nil && !errors.Is(err, io.EOF) { - return nil, fmt.Errorf("read payload: %w", err) - } - - obj.SetPayload(buf) - - return &GetObjectRes{ - obj: &obj, - }, nil -} - -// HeadObjectPrm groups parameters of HeadObject operation. -type HeadObjectPrm struct { - getObjectPrm - - raw bool - - local bool -} - -// SetRawFlag sets flag of raw request. -func (x *HeadObjectPrm) SetRawFlag() { - x.raw = true -} - -// SetTTL sets request TTL value. -func (x *HeadObjectPrm) SetTTL(ttl uint32) { - x.local = ttl < 2 -} - -// HeadObjectRes groups the resulting values of HeadObject operation. -type HeadObjectRes struct { - hdr *object.Object -} - -// Header returns the received object header. -func (x HeadObjectRes) Header() *object.Object { - return x.hdr -} - -// HeadObject reads short object header by address. -// -// Returns any error which prevented the operation from completing correctly in error return. -// For raw requests, returns *object.SplitInfoError error if the requested object is virtual. -func (x Client) HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error) { - var cliPrm client.PrmObjectHead - - if prm.raw { - cliPrm.MarkRaw() - } - - if prm.local { - cliPrm.MarkLocal() - } - - cliPrm.FromContainer(prm.objAddr.Container()) - cliPrm.ByID(prm.objAddr.Object()) - cliPrm.UseKey(*x.key) - - cliRes, err := x.c.ObjectHead(ctx, cliPrm) - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(cliRes.Status()) - } - - if err != nil { - return nil, fmt.Errorf("read object header from FrostFS: %w", err) - } - - var hdr object.Object - - if !cliRes.ReadHeader(&hdr) { - return nil, errors.New("missing object header in the response") - } - - return &HeadObjectRes{ - hdr: &hdr, - }, nil -} - -// GetObjectPayload reads an object by address from FrostFS via Client and returns its payload. -// -// Returns any error which prevented the operation from completing correctly in error return. -func GetObjectPayload(ctx context.Context, c Client, addr oid.Address) ([]byte, error) { - var prm GetObjectPrm - - prm.SetAddress(addr) - - obj, err := c.GetObject(ctx, prm) - if err != nil { - return nil, err - } - - return obj.Object().Payload(), nil -} - -func headObject(ctx context.Context, c Client, addr oid.Address, raw bool, ttl uint32) (*object.Object, error) { - var prm HeadObjectPrm - - prm.SetAddress(addr) - prm.SetTTL(ttl) - - if raw { - prm.SetRawFlag() - } - - obj, err := c.HeadObject(ctx, prm) - if err != nil { - return nil, err - } - - return obj.Header(), nil -} - -// GetRawObjectHeaderLocally reads the raw short object header from the server's local storage by address via Client. -func GetRawObjectHeaderLocally(ctx context.Context, c Client, addr oid.Address) (*object.Object, error) { - return headObject(ctx, c, addr, true, 1) -} - -// GetObjectHeaderFromContainer reads the short object header by address via Client with TTL = 10 -// for deep traversal of the container. -func GetObjectHeaderFromContainer(ctx context.Context, c Client, addr oid.Address) (*object.Object, error) { - return headObject(ctx, c, addr, false, 10) -} - -// HashPayloadRangePrm groups parameters of HashPayloadRange operation. -type HashPayloadRangePrm struct { - getObjectPrm - - rng *object.Range -} - -// SetRange sets payload range to calculate the hash. -func (x *HashPayloadRangePrm) SetRange(rng *object.Range) { - x.rng = rng -} - -// HashPayloadRangeRes groups the resulting values of HashPayloadRange operation. -type HashPayloadRangeRes struct { - h []byte -} - -// Hash returns the hash of the object payload range. -func (x HashPayloadRangeRes) Hash() []byte { - return x.h -} - -// HashPayloadRange requests to calculate Tillich-Zemor hash of the payload range of the object -// from the remote server's local storage. -// -// Returns any error which prevented the operation from completing correctly in error return. -func (x Client) HashPayloadRange(ctx context.Context, prm HashPayloadRangePrm) (res HashPayloadRangeRes, err error) { - var cliPrm client.PrmObjectHash - cliPrm.FromContainer(prm.objAddr.Container()) - cliPrm.ByID(prm.objAddr.Object()) - cliPrm.SetRangeList(prm.rng.GetOffset(), prm.rng.GetLength()) - cliPrm.TillichZemorAlgo() - - cliRes, err := x.c.ObjectHash(ctx, cliPrm) - if err == nil { - // pull out an error from status - err = apistatus.ErrFromStatus(cliRes.Status()) - if err != nil { - return - } - - hs := cliRes.Checksums() - if ln := len(hs); ln != 1 { - err = fmt.Errorf("wrong number of checksums %d", ln) - } else { - res.h = hs[0] - } - } - - return -} - -// HashObjectRange reads Tillich-Zemor hash of the object payload range by address -// from the remote server's local storage via Client. -// -// Returns any error which prevented the operation from completing correctly in error return. -func HashObjectRange(ctx context.Context, c Client, addr oid.Address, rng *object.Range) ([]byte, error) { - var prm HashPayloadRangePrm - - prm.SetAddress(addr) - prm.SetRange(rng) - - res, err := c.HashPayloadRange(ctx, prm) - if err != nil { - return nil, err - } - - return res.Hash(), nil -} diff --git a/pkg/innerring/internal/client/doc.go b/pkg/innerring/internal/client/doc.go deleted file mode 100644 index a04b0627..00000000 --- a/pkg/innerring/internal/client/doc.go +++ /dev/null @@ -1,12 +0,0 @@ -// Package frostfsapiclient provides functionality for IR application communication with FrostFS network. -// -// The basic client for accessing remote nodes via FrostFS API is a FrostFS SDK Go API client. -// However, although it encapsulates a useful piece of business logic (e.g. the signature mechanism), -// the IR application does not fully use the client's flexible interface. -// -// In this regard, this package represents an abstraction -- a type-wrapper over the base client. -// The type provides the minimum interface necessary for the application and also allows you to concentrate -// the entire spectrum of the client's use in one place (this will be convenient both when updating the base client -// and for evaluating the UX of SDK library). So, it is expected that all application packages will be limited -// to this package for the development of functionality requiring FrostFS API communication. -package frostfsapiclient diff --git a/pkg/innerring/internal/client/prm.go b/pkg/innerring/internal/client/prm.go deleted file mode 100644 index 9b587243..00000000 --- a/pkg/innerring/internal/client/prm.go +++ /dev/null @@ -1,18 +0,0 @@ -package frostfsapiclient - -import ( - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" -) - -type objectAddressPrm struct { - objAddr oid.Address -} - -// SetAddress sets address of the object. -func (x *objectAddressPrm) SetAddress(addr oid.Address) { - x.objAddr = addr -} - -type getObjectPrm struct { - objectAddressPrm -} diff --git a/pkg/innerring/processors/audit/events.go b/pkg/innerring/processors/audit/events.go deleted file mode 100644 index 4fb10612..00000000 --- a/pkg/innerring/processors/audit/events.go +++ /dev/null @@ -1,19 +0,0 @@ -package audit - -// Start is an event to start a new round of data audit. -type Start struct { - epoch uint64 -} - -// MorphEvent implements the Event interface. -func (a Start) MorphEvent() {} - -func NewAuditStartEvent(epoch uint64) Start { - return Start{ - epoch: epoch, - } -} - -func (a Start) Epoch() uint64 { - return a.epoch -} diff --git a/pkg/innerring/processors/audit/handlers.go b/pkg/innerring/processors/audit/handlers.go deleted file mode 100644 index 06c656fa..00000000 --- a/pkg/innerring/processors/audit/handlers.go +++ /dev/null @@ -1,22 +0,0 @@ -package audit - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" - "go.uber.org/zap" -) - -func (ap *Processor) handleNewAuditRound(ev event.Event) { - auditEvent := ev.(Start) - - epoch := auditEvent.Epoch() - - ap.log.Info(logs.AuditNewRoundOfAudit, zap.Uint64("epoch", epoch)) - - // send an event to the worker pool - - err := ap.pool.Submit(func() { ap.processStartAudit(epoch) }) - if err != nil { - ap.log.Warn(logs.AuditPreviousRoundOfAuditPrepareHasntFinishedYet) - } -} diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go deleted file mode 100644 index 000279f0..00000000 --- a/pkg/innerring/processors/audit/process.go +++ /dev/null @@ -1,217 +0,0 @@ -package audit - -import ( - "context" - "crypto/sha256" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup" - cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.uber.org/zap" -) - -func (ap *Processor) processStartAudit(epoch uint64) { - log := ap.log.With(zap.Uint64("epoch", epoch)) - - ap.prevAuditCanceler() - - skipped := ap.taskManager.Reset() - if skipped > 0 { - ap.log.Info(logs.AuditSomeTasksFromPreviousEpochAreSkipped, - zap.Int("amount", skipped), - ) - } - - containers, err := ap.selectContainersToAudit(epoch) - if err != nil { - log.Error(logs.AuditContainerSelectionFailure, zap.String("error", err.Error())) - - return - } - - log.Info(logs.AuditSelectContainersForAudit, zap.Int("amount", len(containers))) - - nm, err := ap.netmapClient.GetNetMap(0) - if err != nil { - ap.log.Error(logs.AuditCantFetchNetworkMap, - zap.String("error", err.Error())) - - return - } - - cancelChannel := make(chan struct{}) - ap.prevAuditCanceler = func() { - select { - case <-cancelChannel: // already closed - default: - close(cancelChannel) - } - } - - pivot := make([]byte, sha256.Size) - - ap.startAuditTasksOnContainers(cancelChannel, containers, log, pivot, nm, epoch) -} - -func (ap *Processor) startAuditTasksOnContainers(cancelChannel <-chan struct{}, containers []cid.ID, log *zap.Logger, pivot []byte, nm *netmap.NetMap, epoch uint64) { - for i := range containers { - cnr, err := cntClient.Get(ap.containerClient, containers[i]) // get container structure - if err != nil { - log.Error(logs.AuditCantGetContainerInfoIgnore, - zap.Stringer("cid", containers[i]), - zap.String("error", err.Error())) - - continue - } - - containers[i].Encode(pivot) - - // find all container nodes for current epoch - nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), pivot) - if err != nil { - log.Info(logs.AuditCantBuildPlacementForContainerIgnore, - zap.Stringer("cid", containers[i]), - zap.String("error", err.Error())) - - continue - } - - n := placement.FlattenNodes(nodes) - - // shuffle nodes to ask a random one - rand.Shuffle(len(n), func(i, j int) { - n[i], n[j] = n[j], n[i] - }) - - // search storage groups - storageGroupsIDs := ap.findStorageGroups(containers[i], n) - log.Info(logs.AuditSelectStorageGroupsForAudit, - zap.Stringer("cid", containers[i]), - zap.Int("amount", len(storageGroupsIDs))) - - // filter expired storage groups - storageGroups := ap.filterExpiredSG(containers[i], storageGroupsIDs, nodes, *nm) - log.Info(logs.AuditFilterExpiredStorageGroupsForAudit, - zap.Stringer("cid", containers[i]), - zap.Int("amount", len(storageGroups))) - - // skip audit for containers without - // non-expired storage groups - if len(storageGroupsIDs) == 0 { - continue - } - - auditTask := new(audit.Task). - WithReporter(&epochAuditReporter{ - epoch: epoch, - rep: ap.reporter, - }). - WithCancelChannel(cancelChannel). - WithContainerID(containers[i]). - WithStorageGroupList(storageGroups). - WithContainerStructure(cnr.Value). - WithContainerNodes(nodes). - WithNetworkMap(nm) - - ap.taskManager.PushTask(auditTask) - } -} - -func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []oid.ID { - var sg []oid.ID - - ln := len(shuffled) - - var ( - info clientcore.NodeInfo - prm storagegroup.SearchSGPrm - ) - - prm.Container = cnr - - for i := range shuffled { // consider iterating over some part of container - log := ap.log.With( - zap.Stringer("cid", cnr), - zap.String("key", netmap.StringifyPublicKey(shuffled[0])), - zap.Int("try", i), - zap.Int("total_tries", ln), - ) - - err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(shuffled[i])) - if err != nil { - log.Warn(logs.AuditParseClientNodeInfo, zap.String("error", err.Error())) - - continue - } - - ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) - - prm.NodeInfo = info - - var dst storagegroup.SearchSGDst - - err = ap.sgSrc.ListSG(ctx, &dst, prm) - - cancel() - - if err != nil { - log.Warn(logs.AuditErrorInStorageGroupSearch, zap.String("error", err.Error())) - continue - } - - sg = append(sg, dst.Objects...) - - break // we found storage groups, so break loop - } - - return sg -} - -func (ap *Processor) filterExpiredSG(cid cid.ID, sgIDs []oid.ID, - cnr [][]netmap.NodeInfo, nm netmap.NetMap) []storagegroup.StorageGroup { - sgs := make([]storagegroup.StorageGroup, 0, len(sgIDs)) - var coreSG storagegroup.StorageGroup - - var getSGPrm storagegroup.GetSGPrm - getSGPrm.CID = cid - getSGPrm.Container = cnr - getSGPrm.NetMap = nm - - for _, sgID := range sgIDs { - ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout) - - getSGPrm.OID = sgID - - sg, err := ap.sgSrc.GetSG(ctx, getSGPrm) - - cancel() - - if err != nil { - ap.log.Error( - "could not get storage group object for audit, skipping", - zap.Stringer("cid", cid), - zap.Stringer("oid", sgID), - zap.Error(err), - ) - continue - } - - // filter expired epochs - if sg.ExpirationEpoch() >= ap.epochSrc.EpochCounter() { - coreSG.SetID(sgID) - coreSG.SetStorageGroup(*sg) - - sgs = append(sgs, coreSG) - } - } - - return sgs -} diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go deleted file mode 100644 index 6e0a9820..00000000 --- a/pkg/innerring/processors/audit/processor.go +++ /dev/null @@ -1,143 +0,0 @@ -package audit - -import ( - "context" - "crypto/ecdsa" - "errors" - "fmt" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup" - cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" - nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "github.com/panjf2000/ants/v2" -) - -type ( - // Indexer is a callback interface for inner ring global state. - Indexer interface { - InnerRingIndex() int - InnerRingSize() int - } - - TaskManager interface { - PushTask(*audit.Task) - - // Must skip all tasks planned for execution and - // return their number. - Reset() int - } - - // EpochSource is an interface that provides actual - // epoch information. - EpochSource interface { - // EpochCounter must return current epoch number. - EpochCounter() uint64 - } - - // Processor of events related to data audit. - Processor struct { - log *logger.Logger - pool *ants.Pool - irList Indexer - sgSrc storagegroup.SGSource - epochSrc EpochSource - searchTimeout time.Duration - - containerClient *cntClient.Client - netmapClient *nmClient.Client - - taskManager TaskManager - reporter audit.Reporter - prevAuditCanceler context.CancelFunc - } - - // Params of the processor constructor. - Params struct { - Log *logger.Logger - NetmapClient *nmClient.Client - ContainerClient *cntClient.Client - IRList Indexer - SGSource storagegroup.SGSource - RPCSearchTimeout time.Duration - TaskManager TaskManager - Reporter audit.Reporter - Key *ecdsa.PrivateKey - EpochSource EpochSource - } -) - -type epochAuditReporter struct { - epoch uint64 - - rep audit.Reporter -} - -// ProcessorPoolSize limits pool size for audit Processor. Processor manages -// audit tasks and fills queue for the next epoch. This process must not be interrupted -// by a new audit epoch, so we limit the pool size for the processor to one. -const ProcessorPoolSize = 1 - -// New creates audit processor instance. -func New(p *Params) (*Processor, error) { - switch { - case p.Log == nil: - return nil, errors.New("ir/audit: logger is not set") - case p.IRList == nil: - return nil, errors.New("ir/audit: global state is not set") - case p.SGSource == nil: - return nil, errors.New("ir/audit: SG source is not set") - case p.TaskManager == nil: - return nil, errors.New("ir/audit: audit task manager is not set") - case p.Reporter == nil: - return nil, errors.New("ir/audit: audit result reporter is not set") - case p.Key == nil: - return nil, errors.New("ir/audit: signing key is not set") - case p.EpochSource == nil: - return nil, errors.New("ir/audit: epoch source is not set") - } - - pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true)) - if err != nil { - return nil, fmt.Errorf("ir/audit: can't create worker pool: %w", err) - } - - return &Processor{ - log: p.Log, - pool: pool, - containerClient: p.ContainerClient, - irList: p.IRList, - sgSrc: p.SGSource, - epochSrc: p.EpochSource, - searchTimeout: p.RPCSearchTimeout, - netmapClient: p.NetmapClient, - taskManager: p.TaskManager, - reporter: p.Reporter, - prevAuditCanceler: func() {}, - }, nil -} - -// ListenerNotificationParsers for the 'event.Listener' event producer. -func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { - return nil -} - -// ListenerNotificationHandlers for the 'event.Listener' event producer. -func (ap *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { - return nil -} - -// StartAuditHandler for the internal event producer. -func (ap *Processor) StartAuditHandler() event.Handler { - return ap.handleNewAuditRound -} - -func (r *epochAuditReporter) WriteReport(rep *audit.Report) error { - res := rep.Result() - res.ForEpoch(r.epoch) - - return r.rep.WriteReport(rep) -} diff --git a/pkg/innerring/processors/audit/scheduler.go b/pkg/innerring/processors/audit/scheduler.go deleted file mode 100644 index fbc5fa92..00000000 --- a/pkg/innerring/processors/audit/scheduler.go +++ /dev/null @@ -1,65 +0,0 @@ -package audit - -import ( - "errors" - "fmt" - "sort" - "strings" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "go.uber.org/zap" -) - -var ErrInvalidIRNode = errors.New("node is not in the inner ring list") - -func (ap *Processor) selectContainersToAudit(epoch uint64) ([]cid.ID, error) { - containers, err := ap.containerClient.ContainersOf(nil) - if err != nil { - return nil, fmt.Errorf("can't get list of containers to start audit: %w", err) - } - - // consider getting extra information about container complexity from - // audit contract there - ap.log.Debug(logs.AuditContainerListingFinished, - zap.Int("total amount", len(containers)), - ) - - sort.Slice(containers, func(i, j int) bool { - return strings.Compare(containers[i].EncodeToString(), containers[j].EncodeToString()) < 0 - }) - - ind := ap.irList.InnerRingIndex() - irSize := ap.irList.InnerRingSize() - - if ind < 0 || ind >= irSize { - return nil, ErrInvalidIRNode - } - - return Select(containers, epoch, uint64(ind), uint64(irSize)), nil -} - -func Select(ids []cid.ID, epoch, index, size uint64) []cid.ID { - if index >= size { - return nil - } - - var a, b uint64 - - ln := uint64(len(ids)) - pivot := ln % size - delta := ln / size - - index = (index + epoch) % size - if index < pivot { - a = delta + 1 - } else { - a = delta - b = pivot - } - - from := a*index + b - to := a*(index+1) + b - - return ids[from:to] -} diff --git a/pkg/innerring/processors/audit/scheduler_test.go b/pkg/innerring/processors/audit/scheduler_test.go deleted file mode 100644 index 51461beb..00000000 --- a/pkg/innerring/processors/audit/scheduler_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package audit_test - -import ( - "testing" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" - "github.com/stretchr/testify/require" -) - -func TestSelect(t *testing.T) { - cids := generateContainers(10) - - t.Run("invalid input", func(t *testing.T) { - require.Empty(t, audit.Select(cids, 0, 0, 0)) - }) - - t.Run("even split", func(t *testing.T) { - const irSize = 5 // every node takes two audit nodes - - m := hitMap(cids) - - for i := 0; i < irSize; i++ { - s := audit.Select(cids, 0, uint64(i), irSize) - require.Equal(t, len(cids)/irSize, len(s)) - - for _, id := range s { - n, ok := m[id.EncodeToString()] - require.True(t, ok) - require.Equal(t, 0, n) - m[id.EncodeToString()] = 1 - } - } - - require.True(t, allHit(m)) - }) - - t.Run("odd split", func(t *testing.T) { - const irSize = 3 - - m := hitMap(cids) - - for i := 0; i < irSize; i++ { - s := audit.Select(cids, 0, uint64(i), irSize) - - for _, id := range s { - n, ok := m[id.EncodeToString()] - require.True(t, ok) - require.Equal(t, 0, n) - m[id.EncodeToString()] = 1 - } - } - - require.True(t, allHit(m)) - }) - - t.Run("epoch shift", func(t *testing.T) { - const irSize = 4 - - m := hitMap(cids) - - for i := 0; i < irSize; i++ { - s := audit.Select(cids, uint64(i), 0, irSize) - - for _, id := range s { - n, ok := m[id.EncodeToString()] - require.True(t, ok) - require.Equal(t, 0, n) - m[id.EncodeToString()] = 1 - } - } - - require.True(t, allHit(m)) - }) -} - -func generateContainers(n int) []cid.ID { - result := make([]cid.ID, n) - - for i := 0; i < n; i++ { - result[i] = cidtest.ID() - } - - return result -} - -func hitMap(ids []cid.ID) map[string]int { - result := make(map[string]int, len(ids)) - - for _, id := range ids { - result[id.EncodeToString()] = 0 - } - - return result -} - -func allHit(m map[string]int) bool { - for _, v := range m { - if v == 0 { - return false - } - } - - return true -} diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index 17e445b1..96ea4a01 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -2,9 +2,7 @@ package netmap import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" "go.uber.org/zap" @@ -63,8 +61,6 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { np.netmapSnapshot.update(*networkMap, epoch) np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) - np.handleNewAudit(audit.NewAuditStartEvent(epoch)) - np.handleAuditSettlements(settlement.NewAuditEvent(epoch)) np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash())) np.handleNotaryDeposit(ev) } diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index f6f490ce..39069a96 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -65,10 +65,8 @@ type ( netmapSnapshot cleanupTable - handleNewAudit event.Handler - handleAuditSettlements event.Handler - handleAlphabetSync event.Handler - handleNotaryDeposit event.Handler + handleAlphabetSync event.Handler + handleNotaryDeposit event.Handler nodeValidator NodeValidator @@ -89,10 +87,8 @@ type ( CleanupThreshold uint64 // in epochs ContainerWrapper *container.Client - HandleAudit event.Handler - AuditSettlementsHandler event.Handler - AlphabetSyncHandler event.Handler - NotaryDepositHandler event.Handler + AlphabetSyncHandler event.Handler + NotaryDepositHandler event.Handler NodeValidator NodeValidator @@ -119,10 +115,6 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/netmap: global state is not set") case p.AlphabetState == nil: return nil, errors.New("ir/netmap: global state is not set") - case p.HandleAudit == nil: - return nil, errors.New("ir/netmap: audit handler is not set") - case p.AuditSettlementsHandler == nil: - return nil, errors.New("ir/netmap: audit settlement handler is not set") case p.AlphabetSyncHandler == nil: return nil, errors.New("ir/netmap: alphabet sync handler is not set") case p.NotaryDepositHandler == nil: @@ -151,9 +143,6 @@ func New(p *Params) (*Processor, error) { netmapClient: p.NetmapClient, containerWrp: p.ContainerWrapper, netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold), - handleNewAudit: p.HandleAudit, - - handleAuditSettlements: p.AuditSettlementsHandler, handleAlphabetSync: p.AlphabetSyncHandler, diff --git a/pkg/innerring/processors/settlement/audit/calculate.go b/pkg/innerring/processors/settlement/audit/calculate.go deleted file mode 100644 index 75b8c56a..00000000 --- a/pkg/innerring/processors/settlement/audit/calculate.go +++ /dev/null @@ -1,336 +0,0 @@ -package audit - -import ( - "bytes" - "crypto/ecdsa" - "crypto/elliptic" - "encoding/hex" - "math/big" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/audit" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "go.uber.org/zap" -) - -// CalculatePrm groups the required parameters of -// Calculator.CalculateForEpoch call. -type CalculatePrm struct { - // Number of epoch to perform the calculation. - Epoch uint64 -} - -type singleResultCtx struct { - eAudit uint64 - - auditResult *audit.Result - - log *logger.Logger - - txTable *common.TransferTable - - cnrInfo common.ContainerInfo - - cnrNodes []common.NodeInfo - - passNodes map[string]common.NodeInfo - - sumSGSize *big.Int - - auditFee *big.Int -} - -var ( - bigGB = big.NewInt(1 << 30) - bigZero = big.NewInt(0) - bigOne = big.NewInt(1) -) - -// Calculate calculates payments for audit results in a specific epoch of the network. -// Wraps the results in a money transfer transaction and sends it to the network. -func (c *Calculator) Calculate(p *CalculatePrm) { - log := &logger.Logger{Logger: c.opts.log.With( - zap.Uint64("current epoch", p.Epoch), - )} - - if p.Epoch == 0 { - log.Info(logs.AuditSettlementsAreIgnoredForZeroEpoch) - return - } - - log.Info(logs.AuditCalculateAuditSettlements) - - log.Debug(logs.AuditGettingResultsForThePreviousEpoch) - prevEpoch := p.Epoch - 1 - - auditResults, err := c.prm.ResultStorage.AuditResultsForEpoch(prevEpoch) - if err != nil { - log.Error(logs.AuditCouldNotCollectAuditResults) - return - } else if len(auditResults) == 0 { - log.Debug(logs.AuditNoAuditResultsInPreviousEpoch) - return - } - - auditFee, err := c.prm.AuditFeeFetcher.AuditFee() - if err != nil { - log.Warn(logs.AuditCantFetchAuditFeeFromNetworkConfig, - zap.String("error", err.Error())) - auditFee = 0 - } - - log.Debug(logs.AuditProcessingAuditResults, - zap.Int("number", len(auditResults)), - ) - - table := common.NewTransferTable() - - for i := range auditResults { - c.processResult(&singleResultCtx{ - log: log, - auditResult: auditResults[i], - txTable: table, - auditFee: big.NewInt(0).SetUint64(auditFee), - }) - } - - log.Debug(logs.AuditProcessingTransfers) - - common.TransferAssets(c.prm.Exchanger, table, common.AuditSettlementDetails(prevEpoch)) -} - -func (c *Calculator) processResult(ctx *singleResultCtx) { - ctx.log = &logger.Logger{Logger: ctx.log.With( - zap.Stringer("cid", ctx.containerID()), - zap.Uint64("audit epoch", ctx.auditResult.Epoch()), - )} - - ctx.log.Debug(logs.AuditReadingInformationAboutTheContainer) - - ok := c.readContainerInfo(ctx) - if !ok { - return - } - - ctx.log.Debug(logs.AuditBuildingPlacement) - - ok = c.buildPlacement(ctx) - if !ok { - return - } - - ctx.log.Debug(logs.AuditCollectingPassedNodes) - - ok = c.collectPassNodes(ctx) - if !ok { - return - } - - ctx.log.Debug(logs.AuditCalculatingSumOfTheSizesOfAllStorageGroups) - - ok = c.sumSGSizes(ctx) - if !ok { - return - } - - ctx.log.Debug(logs.AuditFillingTransferTable) - - c.fillTransferTable(ctx) -} - -func (c *Calculator) readContainerInfo(ctx *singleResultCtx) bool { - cnr, ok := ctx.auditResult.Container() - if !ok { - ctx.log.Error(logs.AuditMissingContainerInAuditResult) - return false - } - - var err error - - ctx.cnrInfo, err = c.prm.ContainerStorage.ContainerInfo(cnr) - if err != nil { - ctx.log.Error(logs.AuditCouldNotGetContainerInfo, - zap.String("error", err.Error()), - ) - } - - return err == nil -} - -func (c *Calculator) buildPlacement(ctx *singleResultCtx) bool { - var err error - - ctx.cnrNodes, err = c.prm.PlacementCalculator.ContainerNodes(ctx.auditEpoch(), ctx.containerID()) - if err != nil { - ctx.log.Error(logs.AuditCouldNotGetContainerNodes, - zap.String("error", err.Error()), - ) - } - - empty := len(ctx.cnrNodes) == 0 - if empty { - ctx.log.Debug(logs.AuditEmptyListOfContainerNodes) - } - - return err == nil && !empty -} - -func (c *Calculator) collectPassNodes(ctx *singleResultCtx) bool { - ctx.passNodes = make(map[string]common.NodeInfo) - - for _, cnrNode := range ctx.cnrNodes { - // TODO(@cthulhu-rider): neofs-sdk-go#241 use dedicated method - ctx.auditResult.IteratePassedStorageNodes(func(passNode []byte) bool { - if !bytes.Equal(cnrNode.PublicKey(), passNode) { - return true - } - - failed := false - - ctx.auditResult.IterateFailedStorageNodes(func(failNode []byte) bool { - failed = bytes.Equal(cnrNode.PublicKey(), failNode) - return !failed - }) - - if !failed { - ctx.passNodes[hex.EncodeToString(passNode)] = cnrNode - } - - return false - }) - } - - empty := len(ctx.passNodes) == 0 - if empty { - ctx.log.Debug(logs.AuditNoneOfTheContainerNodesPassedTheAudit) - } - - return !empty -} - -func (c *Calculator) sumSGSizes(ctx *singleResultCtx) bool { - sumPassSGSize := uint64(0) - fail := false - - var addr oid.Address - addr.SetContainer(ctx.containerID()) - - ctx.auditResult.IteratePassedStorageGroups(func(id oid.ID) bool { - addr.SetObject(id) - - sgInfo, err := c.prm.SGStorage.SGInfo(addr) - if err != nil { - ctx.log.Error(logs.AuditCouldNotGetSGInfo, - zap.String("id", id.String()), - zap.String("error", err.Error()), - ) - - fail = true - - return false // we also can continue and calculate at least some part - } - - sumPassSGSize += sgInfo.Size() - - return true - }) - - if fail { - return false - } - - if sumPassSGSize == 0 { - ctx.log.Debug(logs.AuditZeroSumSGSize) - return false - } - - ctx.sumSGSize = big.NewInt(int64(sumPassSGSize)) - - return true -} - -func (c *Calculator) fillTransferTable(ctx *singleResultCtx) bool { - cnrOwner := ctx.cnrInfo.Owner() - - // add txs to pay for storage node - for k, info := range ctx.passNodes { - ownerID, err := c.prm.AccountStorage.ResolveKey(info) - if err != nil { - ctx.log.Error(logs.AuditCouldNotResolvePublicKeyOfTheStorageNode, - zap.String("error", err.Error()), - zap.String("key", k), - ) - - return false // we also can continue and calculate at least some part - } - - price := info.Price() - - ctx.log.Debug(logs.AuditCalculatingStorageNodeSalaryForAudit, - zap.Stringer("sum SG size", ctx.sumSGSize), - zap.Stringer("price", price), - ) - - fee := big.NewInt(0).Mul(price, ctx.sumSGSize) - fee.Div(fee, bigGB) - - if fee.Cmp(bigZero) == 0 { - fee.Add(fee, bigOne) - } - - ctx.txTable.Transfer(&common.TransferTx{ - From: cnrOwner, - To: *ownerID, - Amount: fee, - }) - } - - // add txs to pay inner ring node for audit result - auditIR, err := ownerFromKey(ctx.auditResult.AuditorKey()) - if err != nil { - ctx.log.Error(logs.AuditCouldNotParsePublicKeyOfTheInnerRingNode, - zap.String("error", err.Error()), - zap.String("key", hex.EncodeToString(ctx.auditResult.AuditorKey())), - ) - - return false - } - - ctx.txTable.Transfer(&common.TransferTx{ - From: cnrOwner, - To: *auditIR, - Amount: ctx.auditFee, - }) - - return false -} - -func (c *singleResultCtx) containerID() cid.ID { - cnr, _ := c.auditResult.Container() - return cnr -} - -func (c *singleResultCtx) auditEpoch() uint64 { - if c.eAudit == 0 { - c.eAudit = c.auditResult.Epoch() - } - - return c.eAudit -} - -func ownerFromKey(key []byte) (*user.ID, error) { - pubKey, err := keys.NewPublicKeyFromBytes(key, elliptic.P256()) - if err != nil { - return nil, err - } - - var id user.ID - user.IDFromKey(&id, (ecdsa.PublicKey)(*pubKey)) - - return &id, nil -} diff --git a/pkg/innerring/processors/settlement/audit/calculator.go b/pkg/innerring/processors/settlement/audit/calculator.go deleted file mode 100644 index fb8d8207..00000000 --- a/pkg/innerring/processors/settlement/audit/calculator.go +++ /dev/null @@ -1,48 +0,0 @@ -package audit - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "go.uber.org/zap" -) - -// Calculator represents a component for calculating payments -// based on data audit results and sending remittances to the chain. -type Calculator struct { - prm *CalculatorPrm - - opts *options -} - -// CalculatorOption is a Calculator constructor's option. -type CalculatorOption func(*options) - -type options struct { - log *logger.Logger -} - -func defaultOptions() *options { - return &options{ - log: &logger.Logger{Logger: zap.L()}, - } -} - -// NewCalculator creates, initializes and returns a new Calculator instance. -func NewCalculator(p *CalculatorPrm, opts ...CalculatorOption) *Calculator { - o := defaultOptions() - - for i := range opts { - opts[i](o) - } - - return &Calculator{ - prm: p, - opts: o, - } -} - -// WithLogger returns an option to specify the logging component. -func WithLogger(l *logger.Logger) CalculatorOption { - return func(o *options) { - o.log = l - } -} diff --git a/pkg/innerring/processors/settlement/audit/prm.go b/pkg/innerring/processors/settlement/audit/prm.go deleted file mode 100644 index d357f0d4..00000000 --- a/pkg/innerring/processors/settlement/audit/prm.go +++ /dev/null @@ -1,49 +0,0 @@ -package audit - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/audit" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" -) - -// CalculatorPrm groups the parameters of Calculator's constructor. -type CalculatorPrm struct { - ResultStorage ResultStorage - - ContainerStorage common.ContainerStorage - - PlacementCalculator common.PlacementCalculator - - SGStorage SGStorage - - AccountStorage common.AccountStorage - - Exchanger common.Exchanger - - AuditFeeFetcher FeeFetcher -} - -// ResultStorage is an interface of storage of the audit results. -type ResultStorage interface { - // Must return all audit results by epoch number. - AuditResultsForEpoch(epoch uint64) ([]*audit.Result, error) -} - -// SGInfo groups the data about FrostFS storage group -// necessary for calculating audit fee. -type SGInfo interface { - // Must return sum size of the all group members. - Size() uint64 -} - -// SGStorage is an interface of storage of the storage groups. -type SGStorage interface { - // Must return information about the storage group by address. - SGInfo(oid.Address) (SGInfo, error) -} - -// FeeFetcher wraps AuditFee method that returns audit fee price from -// the network configuration. -type FeeFetcher interface { - AuditFee() (uint64, error) -} diff --git a/pkg/innerring/processors/settlement/basic/collect.go b/pkg/innerring/processors/settlement/basic/collect.go deleted file mode 100644 index 024769c0..00000000 --- a/pkg/innerring/processors/settlement/basic/collect.go +++ /dev/null @@ -1,114 +0,0 @@ -package basic - -import ( - "math/big" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common" - cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" - "go.uber.org/zap" -) - -var ( - bigGB = big.NewInt(1 << 30) - bigZero = big.NewInt(0) - bigOne = big.NewInt(1) -) - -func (inc *IncomeSettlementContext) Collect() { - inc.mu.Lock() - defer inc.mu.Unlock() - - cachedRate, err := inc.rate.BasicRate() - if err != nil { - inc.log.Error(logs.BasicCantGetBasicIncomeRate, - zap.String("error", err.Error())) - - return - } - - if cachedRate == 0 { - inc.noop = true - return - } - - cnrEstimations, err := inc.estimations.Estimations(inc.epoch) - if err != nil { - inc.log.Error(logs.BasicCantFetchContainerSizeEstimations, - zap.Uint64("epoch", inc.epoch), - zap.String("error", err.Error())) - - return - } - - txTable := common.NewTransferTable() - - for i := range cnrEstimations { - owner, err := inc.container.ContainerInfo(cnrEstimations[i].ContainerID) - if err != nil { - inc.log.Warn(logs.BasicCantFetchContainerInfo, - zap.Uint64("epoch", inc.epoch), - zap.Stringer("container_id", cnrEstimations[i].ContainerID), - zap.String("error", err.Error())) - - continue - } - - cnrNodes, err := inc.placement.ContainerNodes(inc.epoch, cnrEstimations[i].ContainerID) - if err != nil { - inc.log.Debug(logs.BasicCantFetchContainerInfo, - zap.Uint64("epoch", inc.epoch), - zap.Stringer("container_id", cnrEstimations[i].ContainerID), - zap.String("error", err.Error())) - - continue - } - - avg := inc.avgEstimation(cnrEstimations[i]) // average container size per node - total := calculateBasicSum(avg, cachedRate, len(cnrNodes)) - - // fill distribute asset table - for i := range cnrNodes { - inc.distributeTable.Put(cnrNodes[i].PublicKey(), avg) - } - - txTable.Transfer(&common.TransferTx{ - From: owner.Owner(), - To: inc.bankOwner, - Amount: total, - }) - } - - common.TransferAssets(inc.exchange, txTable, common.BasicIncomeCollectionDetails(inc.epoch)) -} - -// avgEstimation returns estimation value for a single container. Right now it -// simply calculates an average of all announcements, however it can be smarter and -// base the result on reputation of the announcers and clever math. -func (inc *IncomeSettlementContext) avgEstimation(e *cntClient.Estimations) (avg uint64) { - if len(e.Values) == 0 { - return 0 - } - - for i := range e.Values { - avg += e.Values[i].Size - } - - return avg / uint64(len(e.Values)) -} - -func calculateBasicSum(size, rate uint64, ln int) *big.Int { - bigRate := big.NewInt(int64(rate)) - - total := size * uint64(ln) - - price := big.NewInt(0).SetUint64(total) - price.Mul(price, bigRate) - price.Div(price, bigGB) - - if price.Cmp(bigZero) == 0 { - price.Add(price, bigOne) - } - - return price -} diff --git a/pkg/innerring/processors/settlement/basic/context.go b/pkg/innerring/processors/settlement/basic/context.go deleted file mode 100644 index 59bedf2e..00000000 --- a/pkg/innerring/processors/settlement/basic/context.go +++ /dev/null @@ -1,80 +0,0 @@ -package basic - -import ( - "math/big" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" - "github.com/nspcc-dev/neo-go/pkg/util" -) - -type ( - EstimationFetcher interface { - Estimations(uint64) ([]*container.Estimations, error) - } - - RateFetcher interface { - BasicRate() (uint64, error) - } - - // BalanceFetcher uses NEP-17 compatible balance contract. - BalanceFetcher interface { - Balance(id user.ID) (*big.Int, error) - } - - IncomeSettlementContext struct { - mu sync.Mutex // lock to prevent collection and distribution in the same time - - noop bool - - log *logger.Logger - epoch uint64 - - rate RateFetcher - estimations EstimationFetcher - balances BalanceFetcher - container common.ContainerStorage - placement common.PlacementCalculator - exchange common.Exchanger - accounts common.AccountStorage - - bankOwner user.ID - - // this table is not thread safe, make sure you use it with mu.Lock() - distributeTable *NodeSizeTable - } - - IncomeSettlementContextPrms struct { - Log *logger.Logger - Epoch uint64 - Rate RateFetcher - Estimations EstimationFetcher - Balances BalanceFetcher - Container common.ContainerStorage - Placement common.PlacementCalculator - Exchange common.Exchanger - Accounts common.AccountStorage - } -) - -func NewIncomeSettlementContext(p *IncomeSettlementContextPrms) *IncomeSettlementContext { - res := &IncomeSettlementContext{ - log: p.Log, - epoch: p.Epoch, - rate: p.Rate, - estimations: p.Estimations, - balances: p.Balances, - container: p.Container, - placement: p.Placement, - exchange: p.Exchange, - accounts: p.Accounts, - distributeTable: NewNodeSizeTable(), - } - - res.bankOwner.SetScriptHash(util.Uint160{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) - - return res -} diff --git a/pkg/innerring/processors/settlement/basic/distribute.go b/pkg/innerring/processors/settlement/basic/distribute.go deleted file mode 100644 index 44a8ccea..00000000 --- a/pkg/innerring/processors/settlement/basic/distribute.go +++ /dev/null @@ -1,59 +0,0 @@ -package basic - -import ( - "encoding/hex" - "math/big" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common" - "go.uber.org/zap" -) - -func (inc *IncomeSettlementContext) Distribute() { - inc.mu.Lock() - defer inc.mu.Unlock() - - if inc.noop { - return - } - - txTable := common.NewTransferTable() - - bankBalance, err := inc.balances.Balance(inc.bankOwner) - if err != nil { - inc.log.Error(logs.BasicCantFetchBalanceOfBankingAccount, - zap.String("error", err.Error())) - - return - } - - total := inc.distributeTable.Total() - - inc.distributeTable.Iterate(func(key []byte, n *big.Int) { - nodeOwner, err := inc.accounts.ResolveKey(nodeInfoWrapper(key)) - if err != nil { - inc.log.Warn(logs.BasicCantTransformPublicKeyToOwnerID, - zap.String("public_key", hex.EncodeToString(key)), - zap.String("error", err.Error())) - - return - } - - txTable.Transfer(&common.TransferTx{ - From: inc.bankOwner, - To: *nodeOwner, - Amount: normalizedValue(n, total, bankBalance), - }) - }) - - common.TransferAssets(inc.exchange, txTable, common.BasicIncomeDistributionDetails(inc.epoch)) -} - -func normalizedValue(n, total, limit *big.Int) *big.Int { - if limit.Cmp(bigZero) == 0 { - return big.NewInt(0) - } - - n.Mul(n, limit) - return n.Div(n, total) -} diff --git a/pkg/innerring/processors/settlement/basic/distribute_test.go b/pkg/innerring/processors/settlement/basic/distribute_test.go deleted file mode 100644 index 24eb0db3..00000000 --- a/pkg/innerring/processors/settlement/basic/distribute_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package basic - -import ( - "math/big" - "testing" - - "github.com/stretchr/testify/require" -) - -type normalizedValueCase struct { - name string - n, total, limit uint64 - expected uint64 -} - -func TestNormalizedValues(t *testing.T) { - testCases := []normalizedValueCase{ - { - name: "zero limit", - n: 50, - total: 100, - limit: 0, - expected: 0, - }, - { - name: "scale down", - n: 50, - total: 100, - limit: 10, - expected: 5, - }, - { - name: "scale up", - n: 50, - total: 100, - limit: 1000, - expected: 500, - }, - } - - for _, testCase := range testCases { - testNormalizedValues(t, testCase) - } -} - -func testNormalizedValues(t *testing.T, c normalizedValueCase) { - n := big.NewInt(0).SetUint64(c.n) - total := big.NewInt(0).SetUint64(c.total) - limit := big.NewInt(0).SetUint64(c.limit) - exp := big.NewInt(0).SetUint64(c.expected) - - got := normalizedValue(n, total, limit) - require.Zero(t, exp.Cmp(got), c.name) -} diff --git a/pkg/innerring/processors/settlement/basic/util.go b/pkg/innerring/processors/settlement/basic/util.go deleted file mode 100644 index 258bae46..00000000 --- a/pkg/innerring/processors/settlement/basic/util.go +++ /dev/null @@ -1,44 +0,0 @@ -package basic - -import ( - "math/big" -) - -// NodeSizeTable is not thread safe, make sure it is accessed with external -// locks or in single routine. -type NodeSizeTable struct { - prices map[string]uint64 - total uint64 -} - -func (t *NodeSizeTable) Put(id []byte, avg uint64) { - t.prices[string(id)] += avg - t.total += avg -} - -func (t *NodeSizeTable) Total() *big.Int { - return big.NewInt(0).SetUint64(t.total) -} - -func (t *NodeSizeTable) Iterate(f func([]byte, *big.Int)) { - for k, v := range t.prices { - n := big.NewInt(0).SetUint64(v) - f([]byte(k), n) - } -} - -func NewNodeSizeTable() *NodeSizeTable { - return &NodeSizeTable{ - prices: make(map[string]uint64), - } -} - -type nodeInfoWrapper []byte - -func (nodeInfoWrapper) Price() *big.Int { - panic("should not be used") -} - -func (n nodeInfoWrapper) PublicKey() []byte { - return n -} diff --git a/pkg/innerring/processors/settlement/calls.go b/pkg/innerring/processors/settlement/calls.go deleted file mode 100644 index 2687ad20..00000000 --- a/pkg/innerring/processors/settlement/calls.go +++ /dev/null @@ -1,133 +0,0 @@ -package settlement - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "go.uber.org/zap" -) - -// HandleAuditEvent catches a new AuditEvent and -// adds AuditProcessor call to the execution queue. -func (p *Processor) HandleAuditEvent(e event.Event) { - ev := e.(AuditEvent) - - epoch := ev.Epoch() - - if !p.state.IsAlphabet() { - p.log.Info(logs.SettlementNonAlphabetModeIgnoreAuditPayments) - - return - } - - log := &logger.Logger{Logger: p.log.With( - zap.Uint64("epoch", epoch), - )} - - log.Info(logs.SettlementNewAuditSettlementEvent) - - if epoch == 0 { - log.Debug(logs.SettlementIgnoreGenesisEpoch) - return - } - - handler := &auditEventHandler{ - log: log, - epoch: epoch, - proc: p.auditProc, - } - - err := p.pool.Submit(handler.handle) - if err != nil { - log.Warn(logs.SettlementCouldNotAddHandlerOfAuditEventToQueue, - zap.String("error", err.Error()), - ) - - return - } - - log.Debug(logs.SettlementAuditEventHandlingSuccessfullyScheduled) -} - -func (p *Processor) HandleIncomeCollectionEvent(e event.Event) { - ev := e.(BasicIncomeCollectEvent) - epoch := ev.Epoch() - - if !p.state.IsAlphabet() { - p.log.Info(logs.SettlementNonAlphabetModeIgnoreIncomeCollectionEvent) - - return - } - - p.log.Info(logs.SettlementStartBasicIncomeCollection, - zap.Uint64("epoch", epoch)) - - p.contextMu.Lock() - defer p.contextMu.Unlock() - - if _, ok := p.incomeContexts[epoch]; ok { - p.log.Error(logs.SettlementIncomeContextAlreadyExists, - zap.Uint64("epoch", epoch)) - - return - } - - incomeCtx, err := p.basicIncome.CreateContext(epoch) - if err != nil { - p.log.Error(logs.SettlementCantCreateIncomeContext, - zap.String("error", err.Error())) - - return - } - - p.incomeContexts[epoch] = incomeCtx - - err = p.pool.Submit(func() { - incomeCtx.Collect() - }) - if err != nil { - p.log.Warn(logs.SettlementCouldNotAddHandlerOfBasicIncomeCollectionToQueue, - zap.String("error", err.Error()), - ) - - return - } -} - -func (p *Processor) HandleIncomeDistributionEvent(e event.Event) { - ev := e.(BasicIncomeDistributeEvent) - epoch := ev.Epoch() - - if !p.state.IsAlphabet() { - p.log.Info(logs.SettlementNonAlphabetModeIgnoreIncomeDistributionEvent) - - return - } - - p.log.Info(logs.SettlementStartBasicIncomeDistribution, - zap.Uint64("epoch", epoch)) - - p.contextMu.Lock() - defer p.contextMu.Unlock() - - incomeCtx, ok := p.incomeContexts[epoch] - delete(p.incomeContexts, epoch) - - if !ok { - p.log.Warn(logs.SettlementIncomeContextDistributionDoesNotExists, - zap.Uint64("epoch", epoch)) - - return - } - - err := p.pool.Submit(func() { - incomeCtx.Distribute() - }) - if err != nil { - p.log.Warn(logs.SettlementCouldNotAddHandlerOfBasicIncomeDistributionToQueue, - zap.String("error", err.Error()), - ) - - return - } -} diff --git a/pkg/innerring/processors/settlement/common/details.go b/pkg/innerring/processors/settlement/common/details.go deleted file mode 100644 index 1cf719f6..00000000 --- a/pkg/innerring/processors/settlement/common/details.go +++ /dev/null @@ -1,33 +0,0 @@ -package common - -import ( - "encoding/binary" -) - -var ( - auditPrefix = []byte{0x40} - basicIncomeCollectionPrefix = []byte{0x41} - basicIncomeDistributionPrefix = []byte{0x42} -) - -func AuditSettlementDetails(epoch uint64) []byte { - return details(auditPrefix, epoch) -} - -func BasicIncomeCollectionDetails(epoch uint64) []byte { - return details(basicIncomeCollectionPrefix, epoch) -} - -func BasicIncomeDistributionDetails(epoch uint64) []byte { - return details(basicIncomeDistributionPrefix, epoch) -} - -func details(prefix []byte, epoch uint64) []byte { - prefixLen := len(prefix) - buf := make([]byte, prefixLen+8) - - copy(buf, prefix) - binary.LittleEndian.PutUint64(buf[prefixLen:], epoch) - - return buf -} diff --git a/pkg/innerring/processors/settlement/common/details_test.go b/pkg/innerring/processors/settlement/common/details_test.go deleted file mode 100644 index 9755e6ae..00000000 --- a/pkg/innerring/processors/settlement/common/details_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package common - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestAuditSettlementDetails(t *testing.T) { - var n uint64 = 1994 // 0x7CA - exp := []byte{0x40, 0xCA, 0x07, 0, 0, 0, 0, 0, 0} - got := AuditSettlementDetails(n) - require.Equal(t, exp, got) -} - -func TestBasicIncomeCollectionDetails(t *testing.T) { - var n uint64 = 1994 // 0x7CA - exp := []byte{0x41, 0xCA, 0x07, 0, 0, 0, 0, 0, 0} - got := BasicIncomeCollectionDetails(n) - require.Equal(t, exp, got) -} - -func TestBasicIncomeDistributionDetails(t *testing.T) { - var n uint64 = 1994 // 0x7CA - exp := []byte{0x42, 0xCA, 0x07, 0, 0, 0, 0, 0, 0} - got := BasicIncomeDistributionDetails(n) - require.Equal(t, exp, got) -} diff --git a/pkg/innerring/processors/settlement/common/types.go b/pkg/innerring/processors/settlement/common/types.go deleted file mode 100644 index 9dca0fd0..00000000 --- a/pkg/innerring/processors/settlement/common/types.go +++ /dev/null @@ -1,54 +0,0 @@ -package common - -import ( - "math/big" - - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" -) - -// NodeInfo groups the data about the storage node -// necessary for calculating audit fees. -type NodeInfo interface { - // Must return storage price of the node for one epoch in GASe-12. - Price() *big.Int - - // Must return public key of the node. - PublicKey() []byte -} - -// ContainerInfo groups the data about FrostFS container -// necessary for calculating audit fee. -type ContainerInfo interface { - // Must return identifier of the container owner. - Owner() user.ID -} - -// ContainerStorage is an interface of -// storage of the FrostFS containers. -type ContainerStorage interface { - // Must return information about the container by ID. - ContainerInfo(cid.ID) (ContainerInfo, error) -} - -// PlacementCalculator is a component interface -// that builds placement vectors. -type PlacementCalculator interface { - // Must return information about the nodes from container by its ID of the given epoch. - ContainerNodes(uint64, cid.ID) ([]NodeInfo, error) -} - -// AccountStorage is an network member accounts interface. -type AccountStorage interface { - // Must resolve information about the storage node - // to its ID in system. - ResolveKey(NodeInfo) (*user.ID, error) -} - -// Exchanger is an interface of monetary component. -type Exchanger interface { - // Must transfer amount of GASe-12 from sender to recipient. - // - // Amount must be positive. - Transfer(sender, recipient user.ID, amount *big.Int, details []byte) -} diff --git a/pkg/innerring/processors/settlement/common/util.go b/pkg/innerring/processors/settlement/common/util.go deleted file mode 100644 index 6f40fb57..00000000 --- a/pkg/innerring/processors/settlement/common/util.go +++ /dev/null @@ -1,74 +0,0 @@ -package common - -import ( - "math/big" - - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" -) - -type TransferTable struct { - txs map[string]map[string]*TransferTx -} - -type TransferTx struct { - From, To user.ID - - Amount *big.Int -} - -func NewTransferTable() *TransferTable { - return &TransferTable{ - txs: make(map[string]map[string]*TransferTx), - } -} - -func (t *TransferTable) Transfer(tx *TransferTx) { - if tx.From.Equals(tx.To) { - return - } - - from, to := tx.From.EncodeToString(), tx.To.EncodeToString() - - m, ok := t.txs[from] - if !ok { - if m, ok = t.txs[to]; ok { - to = from // ignore `From = To` swap because `From` doesn't require - tx.Amount.Neg(tx.Amount) - } else { - m = make(map[string]*TransferTx, 1) - t.txs[from] = m - } - } - - tgt, ok := m[to] - if !ok { - m[to] = tx - return - } - - tgt.Amount.Add(tgt.Amount, tx.Amount) -} - -func (t *TransferTable) Iterate(f func(*TransferTx)) { - for _, m := range t.txs { - for _, tx := range m { - f(tx) - } - } -} - -func TransferAssets(e Exchanger, t *TransferTable, details []byte) { - t.Iterate(func(tx *TransferTx) { - sign := tx.Amount.Sign() - if sign == 0 { - return - } - - if sign < 0 { - tx.From, tx.To = tx.To, tx.From - tx.Amount.Neg(tx.Amount) - } - - e.Transfer(tx.From, tx.To, tx.Amount, details) - }) -} diff --git a/pkg/innerring/processors/settlement/deps.go b/pkg/innerring/processors/settlement/deps.go deleted file mode 100644 index 37d7955a..00000000 --- a/pkg/innerring/processors/settlement/deps.go +++ /dev/null @@ -1,17 +0,0 @@ -package settlement - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/basic" -) - -// AuditProcessor is an interface of data audit fee processor. -type AuditProcessor interface { - // Must process data audit conducted in epoch. - ProcessAuditSettlements(epoch uint64) -} - -// BasicIncomeInitializer is an interface of basic income context creator. -type BasicIncomeInitializer interface { - // Creates context that processes basic income for provided epoch. - CreateContext(epoch uint64) (*basic.IncomeSettlementContext, error) -} diff --git a/pkg/innerring/processors/settlement/events.go b/pkg/innerring/processors/settlement/events.go deleted file mode 100644 index a47a3e89..00000000 --- a/pkg/innerring/processors/settlement/events.go +++ /dev/null @@ -1,46 +0,0 @@ -package settlement - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" -) - -// AuditEvent is an event of the start of -// cash settlements for data audit. -type AuditEvent struct { - epoch uint64 -} - -type ( - BasicIncomeCollectEvent = AuditEvent - BasicIncomeDistributeEvent = AuditEvent -) - -// MorphEvent implements Neo:Morph event. -func (e AuditEvent) MorphEvent() {} - -// NewAuditEvent creates new AuditEvent for epoch. -func NewAuditEvent(epoch uint64) event.Event { - return AuditEvent{ - epoch: epoch, - } -} - -// Epoch returns the number of the epoch -// in which the event was generated. -func (e AuditEvent) Epoch() uint64 { - return e.epoch -} - -// NewBasicIncomeCollectEvent for epoch. -func NewBasicIncomeCollectEvent(epoch uint64) event.Event { - return BasicIncomeCollectEvent{ - epoch: epoch, - } -} - -// NewBasicIncomeDistributeEvent for epoch. -func NewBasicIncomeDistributeEvent(epoch uint64) event.Event { - return BasicIncomeDistributeEvent{ - epoch: epoch, - } -} diff --git a/pkg/innerring/processors/settlement/handlers.go b/pkg/innerring/processors/settlement/handlers.go deleted file mode 100644 index e69d829e..00000000 --- a/pkg/innerring/processors/settlement/handlers.go +++ /dev/null @@ -1,22 +0,0 @@ -package settlement - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" -) - -type auditEventHandler struct { - log *logger.Logger - - epoch uint64 - - proc AuditProcessor -} - -func (p *auditEventHandler) handle() { - p.log.Info(logs.SettlementProcessAuditSettlements) - - p.proc.ProcessAuditSettlements(p.epoch) - - p.log.Info(logs.SettlementAuditProcessingFinished) -} diff --git a/pkg/innerring/processors/settlement/opts.go b/pkg/innerring/processors/settlement/opts.go deleted file mode 100644 index b344f98d..00000000 --- a/pkg/innerring/processors/settlement/opts.go +++ /dev/null @@ -1,31 +0,0 @@ -package settlement - -import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "go.uber.org/zap" -) - -// Option is a Processor constructor's option. -type Option func(*options) - -type options struct { - poolSize int - - log *logger.Logger -} - -func defaultOptions() *options { - const poolSize = 10 - - return &options{ - poolSize: poolSize, - log: &logger.Logger{Logger: zap.L()}, - } -} - -// WithLogger returns option to override the component for logging. -func WithLogger(l *logger.Logger) Option { - return func(o *options) { - o.log = l - } -} diff --git a/pkg/innerring/processors/settlement/processor.go b/pkg/innerring/processors/settlement/processor.go deleted file mode 100644 index 1870a035..00000000 --- a/pkg/innerring/processors/settlement/processor.go +++ /dev/null @@ -1,79 +0,0 @@ -package settlement - -import ( - "fmt" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/basic" - nodeutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - "github.com/panjf2000/ants/v2" - "go.uber.org/zap" -) - -type ( - // AlphabetState is a callback interface for inner ring global state. - AlphabetState interface { - IsAlphabet() bool - } - - // Processor is an event handler for payments in the system. - Processor struct { - log *logger.Logger - - state AlphabetState - - pool nodeutil.WorkerPool - - auditProc AuditProcessor - - basicIncome BasicIncomeInitializer - - contextMu sync.Mutex - incomeContexts map[uint64]*basic.IncomeSettlementContext - } - - // Prm groups the required parameters of Processor's constructor. - Prm struct { - AuditProcessor AuditProcessor - BasicIncome BasicIncomeInitializer - State AlphabetState - } -) - -func panicOnPrmValue(n string, v any) { - panic(fmt.Sprintf("invalid parameter %s (%T):%v", n, v, v)) -} - -// New creates and returns a new Processor instance. -func New(prm Prm, opts ...Option) *Processor { - switch { - case prm.AuditProcessor == nil: - panicOnPrmValue("AuditProcessor", prm.AuditProcessor) - } - - o := defaultOptions() - - for i := range opts { - opts[i](o) - } - - pool, err := ants.NewPool(o.poolSize, ants.WithNonblocking(true)) - if err != nil { - panic(fmt.Errorf("could not create worker pool: %w", err)) - } - - o.log.Debug(logs.SettlementWorkerPoolForSettlementProcessorSuccessfullyInitialized, - zap.Int("capacity", o.poolSize), - ) - - return &Processor{ - log: o.log, - state: prm.State, - pool: pool, - auditProc: prm.AuditProcessor, - basicIncome: prm.BasicIncome, - incomeContexts: make(map[uint64]*basic.IncomeSettlementContext), - } -} diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go deleted file mode 100644 index 27338eaa..00000000 --- a/pkg/innerring/rpc.go +++ /dev/null @@ -1,236 +0,0 @@ -package innerring - -import ( - "context" - "crypto/ecdsa" - "fmt" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - storagegroup2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup" - frostfsapiclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/internal/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/auditor" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/storagegroup" - "go.uber.org/zap" -) - -type ( - ClientCache struct { - log *logger.Logger - cache interface { - Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error) - CloseAll() - } - key *ecdsa.PrivateKey - - sgTimeout, headTimeout, rangeTimeout time.Duration - } - - clientCacheParams struct { - Log *logger.Logger - Key *ecdsa.PrivateKey - - AllowExternal bool - - SGTimeout, HeadTimeout, RangeTimeout time.Duration - } -) - -func newClientCache(p *clientCacheParams) *ClientCache { - return &ClientCache{ - log: p.Log, - cache: cache.NewSDKClientCache(cache.ClientCacheOpts{AllowExternal: p.AllowExternal, Key: p.Key}), - key: p.Key, - sgTimeout: p.SGTimeout, - headTimeout: p.HeadTimeout, - rangeTimeout: p.RangeTimeout, - } -} - -func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) { - // Because cache is used by `ClientCache` exclusively, - // client will always have valid key. - return c.cache.Get(info) -} - -// GetSG polls the container to get the object by id. -// Returns storage groups structure from received object. -// -// Returns an error of type apistatus.ObjectNotFound if storage group is missing. -func (c *ClientCache) GetSG(ctx context.Context, prm storagegroup2.GetSGPrm) (*storagegroup.StorageGroup, error) { - var sgAddress oid.Address - sgAddress.SetContainer(prm.CID) - sgAddress.SetObject(prm.OID) - - return c.getSG(ctx, sgAddress, &prm.NetMap, prm.Container) -} - -func (c *ClientCache) getSG(ctx context.Context, addr oid.Address, nm *netmap.NetMap, cn [][]netmap.NodeInfo) (*storagegroup.StorageGroup, error) { - obj := addr.Object() - - nodes, err := placement.BuildObjectPlacement(nm, cn, &obj) - if err != nil { - return nil, fmt.Errorf("can't build object placement: %w", err) - } - - var info clientcore.NodeInfo - - var getObjPrm frostfsapiclient.GetObjectPrm - getObjPrm.SetAddress(addr) - - for _, node := range placement.FlattenNodes(nodes) { - err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(node)) - if err != nil { - return nil, fmt.Errorf("parse client node info: %w", err) - } - - cli, err := c.getWrappedClient(info) - if err != nil { - c.log.Warn(logs.InnerringCantSetupRemoteConnection, - zap.String("error", err.Error())) - - continue - } - - ctx, cancel := context.WithTimeout(ctx, c.sgTimeout) - - // NOTE: we use the function which does not verify object integrity (checksums, signature), - // but it would be useful to do as part of a data audit. - res, err := cli.GetObject(ctx, getObjPrm) - - cancel() - - if err != nil { - c.log.Warn(logs.InnerringCantGetStorageGroupObject, - zap.String("error", err.Error())) - - continue - } - - var sg storagegroup.StorageGroup - - err = storagegroup.ReadFromObject(&sg, *res.Object()) - if err != nil { - return nil, fmt.Errorf("can't parse storage group from a object: %w", err) - } - - return &sg, nil - } - - var errNotFound apistatus.ObjectNotFound - - return nil, errNotFound -} - -// GetHeader requests node from the container under audit to return object header by id. -func (c *ClientCache) GetHeader(ctx context.Context, prm auditor.GetHeaderPrm) (*object.Object, error) { - var objAddress oid.Address - objAddress.SetContainer(prm.CID) - objAddress.SetObject(prm.OID) - - var info clientcore.NodeInfo - - err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(prm.Node)) - if err != nil { - return nil, fmt.Errorf("parse client node info: %w", err) - } - - cli, err := c.getWrappedClient(info) - if err != nil { - return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) - } - - cctx, cancel := context.WithTimeout(ctx, c.headTimeout) - - var obj *object.Object - - if prm.NodeIsRelay { - obj, err = frostfsapiclient.GetObjectHeaderFromContainer(cctx, cli, objAddress) - } else { - obj, err = frostfsapiclient.GetRawObjectHeaderLocally(cctx, cli, objAddress) - } - - cancel() - - if err != nil { - return nil, fmt.Errorf("object head error: %w", err) - } - - return obj, nil -} - -// GetRangeHash requests node from the container under audit to return Tillich-Zemor hash of the -// payload range of the object with specified identifier. -func (c *ClientCache) GetRangeHash(ctx context.Context, prm auditor.GetRangeHashPrm) ([]byte, error) { - var objAddress oid.Address - objAddress.SetContainer(prm.CID) - objAddress.SetObject(prm.OID) - - var info clientcore.NodeInfo - - err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(prm.Node)) - if err != nil { - return nil, fmt.Errorf("parse client node info: %w", err) - } - - cli, err := c.getWrappedClient(info) - if err != nil { - return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) - } - - cctx, cancel := context.WithTimeout(ctx, c.rangeTimeout) - - h, err := frostfsapiclient.HashObjectRange(cctx, cli, objAddress, prm.Range) - - cancel() - - if err != nil { - return nil, fmt.Errorf("object rangehash error: %w", err) - } - - return h, nil -} - -func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (frostfsapiclient.Client, error) { - // can be also cached - var cInternal frostfsapiclient.Client - - cli, err := c.Get(info) - if err != nil { - return cInternal, fmt.Errorf("could not get API client from cache") - } - - cInternal.WrapBasicClient(cli) - cInternal.SetPrivateKey(c.key) - - return cInternal, nil -} - -func (c ClientCache) ListSG(ctx context.Context, dst *storagegroup2.SearchSGDst, prm storagegroup2.SearchSGPrm) error { - cli, err := c.getWrappedClient(prm.NodeInfo) - if err != nil { - return fmt.Errorf("could not get API client from cache") - } - - var cliPrm frostfsapiclient.SearchSGPrm - - cliPrm.SetContainerID(prm.Container) - - res, err := cli.SearchSG(ctx, cliPrm) - if err != nil { - return err - } - - dst.Objects = res.IDList() - - return nil -} diff --git a/pkg/innerring/settlement.go b/pkg/innerring/settlement.go deleted file mode 100644 index 90255f5c..00000000 --- a/pkg/innerring/settlement.go +++ /dev/null @@ -1,301 +0,0 @@ -package innerring - -import ( - "context" - "crypto/ecdsa" - "crypto/elliptic" - "crypto/sha256" - "encoding/hex" - "fmt" - "math/big" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/basic" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common" - auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit" - balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" - containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" - netmapClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - auditAPI "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/audit" - containerAPI "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - netmapAPI "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/storagegroup" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "go.uber.org/zap" -) - -const ( - auditSettlementContext = "audit" - basicIncomeSettlementContext = "basic income" -) - -type settlementDeps struct { - log *logger.Logger - - cnrSrc container.Source - - auditClient *auditClient.Client - - nmClient *netmapClient.Client - - clientCache *ClientCache - - balanceClient *balanceClient.Client - - settlementCtx string -} - -type auditSettlementDeps struct { - settlementDeps -} - -type basicIncomeSettlementDeps struct { - settlementDeps - cnrClient *containerClient.Client -} - -type basicSettlementConstructor struct { - dep *basicIncomeSettlementDeps -} - -type auditSettlementCalculator audit.Calculator - -type containerWrapper containerAPI.Container - -type nodeInfoWrapper struct { - ni netmapAPI.NodeInfo -} - -type sgWrapper storagegroup.StorageGroup - -func (s *sgWrapper) Size() uint64 { - return (*storagegroup.StorageGroup)(s).ValidationDataSize() -} - -func (n nodeInfoWrapper) PublicKey() []byte { - return n.ni.PublicKey() -} - -func (n nodeInfoWrapper) Price() *big.Int { - return big.NewInt(int64(n.ni.Price())) -} - -func (c containerWrapper) Owner() user.ID { - return (containerAPI.Container)(c).Owner() -} - -func (s settlementDeps) AuditResultsForEpoch(epoch uint64) ([]*auditAPI.Result, error) { - idList, err := s.auditClient.ListAuditResultIDByEpoch(epoch) - if err != nil { - return nil, fmt.Errorf("could not list audit results in sidechain: %w", err) - } - - res := make([]*auditAPI.Result, 0, len(idList)) - - for i := range idList { - r, err := s.auditClient.GetAuditResult(idList[i]) - if err != nil { - return nil, fmt.Errorf("could not get audit result: %w", err) - } - - res = append(res, r) - } - - return res, nil -} - -func (s settlementDeps) ContainerInfo(cid cid.ID) (common.ContainerInfo, error) { - cnr, err := s.cnrSrc.Get(cid) - if err != nil { - return nil, fmt.Errorf("could not get container from storage: %w", err) - } - - return (containerWrapper)(cnr.Value), nil -} - -func (s settlementDeps) buildContainer(e uint64, cid cid.ID) ([][]netmapAPI.NodeInfo, *netmapAPI.NetMap, error) { - var ( - nm *netmapAPI.NetMap - err error - ) - - if e > 0 { - nm, err = s.nmClient.GetNetMapByEpoch(e) - } else { - nm, err = netmap.GetLatestNetworkMap(s.nmClient) - } - - if err != nil { - return nil, nil, fmt.Errorf("could not get network map from storage: %w", err) - } - - cnr, err := s.cnrSrc.Get(cid) - if err != nil { - return nil, nil, fmt.Errorf("could not get container from sidechain: %w", err) - } - - binCnr := make([]byte, sha256.Size) - cid.Encode(binCnr) - - cn, err := nm.ContainerNodes( - cnr.Value.PlacementPolicy(), - binCnr, // may be replace pivot calculation to frostfs-api-go - ) - if err != nil { - return nil, nil, fmt.Errorf("could not calculate container nodes: %w", err) - } - - return cn, nm, nil -} - -func (s settlementDeps) ContainerNodes(e uint64, cid cid.ID) ([]common.NodeInfo, error) { - cn, _, err := s.buildContainer(e, cid) - if err != nil { - return nil, err - } - - var sz int - - for i := range cn { - sz += len(cn[i]) - } - - res := make([]common.NodeInfo, 0, sz) - - for i := range cn { - for j := range cn[i] { - res = append(res, nodeInfoWrapper{ - ni: cn[i][j], - }) - } - } - - return res, nil -} - -// SGInfo returns audit.SGInfo by object address. -// -// Returns an error of type apistatus.ObjectNotFound if storage group is missing. -func (s settlementDeps) SGInfo(addr oid.Address) (audit.SGInfo, error) { - cnr := addr.Container() - - cn, nm, err := s.buildContainer(0, cnr) - if err != nil { - return nil, err - } - - sg, err := s.clientCache.getSG(context.Background(), addr, nm, cn) - if err != nil { - return nil, err - } - - return (*sgWrapper)(sg), nil -} - -func (s settlementDeps) ResolveKey(ni common.NodeInfo) (*user.ID, error) { - pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256()) - if err != nil { - return nil, err - } - - var id user.ID - user.IDFromKey(&id, (ecdsa.PublicKey)(*pub)) - - return &id, nil -} - -func (s settlementDeps) Transfer(sender, recipient user.ID, amount *big.Int, details []byte) { - if s.settlementCtx == "" { - panic("unknown settlement deps context") - } - - log := s.log.With( - zap.Stringer("sender", sender), - zap.Stringer("recipient", recipient), - zap.Stringer("amount (GASe-12)", amount), - zap.String("details", hex.EncodeToString(details)), - ) - - if !amount.IsInt64() { - s.log.Error(logs.InnerringAmountCanNotBeRepresentedAsAnInt64) - - return - } - - params := balanceClient.TransferPrm{ - Amount: amount.Int64(), - From: sender, - To: recipient, - Details: details, - } - - err := s.balanceClient.TransferX(params) - if err != nil { - log.Error(fmt.Sprintf("%s: could not send transfer", s.settlementCtx), - zap.String("error", err.Error()), - ) - - return - } - - log.Debug(fmt.Sprintf("%s: transfer was successfully sent", s.settlementCtx)) -} - -func (b basicIncomeSettlementDeps) BasicRate() (uint64, error) { - return b.nmClient.BasicIncomeRate() -} - -func (b basicIncomeSettlementDeps) Estimations(epoch uint64) ([]*containerClient.Estimations, error) { - estimationIDs, err := b.cnrClient.ListLoadEstimationsByEpoch(epoch) - if err != nil { - return nil, err - } - - result := make([]*containerClient.Estimations, 0, len(estimationIDs)) - - for i := range estimationIDs { - estimation, err := b.cnrClient.GetUsedSpaceEstimations(estimationIDs[i]) - if err != nil { - b.log.Warn(logs.InnerringCantGetUsedSpaceEstimation, - zap.String("estimation_id", hex.EncodeToString(estimationIDs[i])), - zap.String("error", err.Error())) - - continue - } - - result = append(result, estimation) - } - - return result, nil -} - -func (b basicIncomeSettlementDeps) Balance(id user.ID) (*big.Int, error) { - return b.balanceClient.BalanceOf(id) -} - -func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) { - (*audit.Calculator)(s).Calculate(&audit.CalculatePrm{ - Epoch: epoch, - }) -} - -func (b *basicSettlementConstructor) CreateContext(epoch uint64) (*basic.IncomeSettlementContext, error) { - return basic.NewIncomeSettlementContext(&basic.IncomeSettlementContextPrms{ - Log: b.dep.log, - Epoch: epoch, - Rate: b.dep, - Estimations: b.dep, - Balances: b.dep, - Container: b.dep, - Placement: b.dep, - Exchange: b.dep, - Accounts: b.dep, - }), nil -} diff --git a/pkg/innerring/state.go b/pkg/innerring/state.go index e3bf7886..6a6ca0ad 100644 --- a/pkg/innerring/state.go +++ b/pkg/innerring/state.go @@ -6,8 +6,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" - auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit" control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state" "github.com/nspcc-dev/neo-go/pkg/util" @@ -146,18 +144,6 @@ func (s *Server) VoteForSidechainValidator(prm governance.VoteValidatorPrm) erro return s.voteForSidechainValidator(prm) } -// WriteReport composes the audit result structure from the audit report -// and sends it to Audit contract. -func (s *Server) WriteReport(r *audit.Report) error { - res := r.Result() - res.SetAuditorKey(s.pubKey) - - prm := auditClient.PutPrm{} - prm.SetResult(res) - - return s.auditClient.PutAuditResult(prm) -} - // ResetEpochTimer resets the block timer that produces events to update epoch // counter in the netmap contract. It is used to synchronize this even production // based on the block with a notification of the last epoch.