[#248] innerring: Remove audit and settlement code

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
feat/299
Evgenii Stratonikov 2023-04-25 09:16:14 +03:00
parent 8b2aae73c6
commit 8879c6ea4a
36 changed files with 8 additions and 3008 deletions

View File

@ -5,10 +5,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/timer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -25,12 +23,6 @@ type (
IsAlphabet() bool
}
subEpochEventHandler struct {
handler event.Handler // handle to execute
durationMul uint32 // X: X/Y of epoch in blocks
durationDiv uint32 // Y: X/Y of epoch in blocks
}
newEpochHandler func()
epochTimerArgs struct {
@ -45,9 +37,6 @@ type (
stopEstimationDMul uint32 // X: X/Y of epoch in blocks
stopEstimationDDiv uint32 // Y: X/Y of epoch in blocks
collectBasicIncome subEpochEventHandler
distributeBasicIncome subEpochEventHandler
}
emitTimerArgs struct {
@ -119,34 +108,6 @@ func newEpochTimer(args *epochTimerArgs) *timer.BlockTimer {
}
})
epochTimer.OnDelta(
args.collectBasicIncome.durationMul,
args.collectBasicIncome.durationDiv,
func() {
epochN := args.epoch.EpochCounter()
if epochN == 0 { // estimates are invalid in genesis epoch
return
}
args.collectBasicIncome.handler(
settlement.NewBasicIncomeCollectEvent(epochN - 1),
)
})
epochTimer.OnDelta(
args.distributeBasicIncome.durationMul,
args.distributeBasicIncome.durationDiv,
func() {
epochN := args.epoch.EpochCounter()
if epochN == 0 { // estimates are invalid in genesis epoch
return
}
args.distributeBasicIncome.handler(
settlement.NewBasicIncomeDistributeEvent(epochN - 1),
)
})
return epochTimer
}

View File

@ -15,7 +15,6 @@ type contracts struct {
netmap util.Uint160 // in morph
balance util.Uint160 // in morph
container util.Uint160 // in morph
audit util.Uint160 // in morph
proxy util.Uint160 // in morph
processing util.Uint160 // in mainnet
frostfsID util.Uint160 // in morph
@ -58,7 +57,6 @@ func parseContracts(cfg *viper.Viper, morph *client.Client, withoutMainNet, with
{"contracts.netmap", client.NNSNetmapContractName, &result.netmap},
{"contracts.balance", client.NNSBalanceContractName, &result.balance},
{"contracts.container", client.NNSContainerContractName, &result.container},
{"contracts.audit", client.NNSAuditContractName, &result.audit},
{"contracts.frostfsid", client.NNSFrostFSIDContractName, &result.frostfsID},
}

View File

@ -8,7 +8,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance"
cont "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs"
@ -17,25 +16,19 @@ import (
nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation"
addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress"
statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement"
auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -43,9 +36,7 @@ import (
func (s *Server) initNetmapProcessor(cfg *viper.Viper,
cnrClient *container.Client,
alphaSync event.Handler,
auditProcessor *audit.Processor,
settlementProcessor *settlement.Processor) error {
alphaSync event.Handler) error {
locodeValidator, err := s.newLocodeValidator(cfg)
if err != nil {
return err
@ -70,15 +61,9 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
ContainerWrapper: cnrClient,
HandleAudit: s.onlyActiveEventHandler(
auditProcessor.StartAuditHandler(),
),
NotaryDepositHandler: s.onlyAlphabetEventHandler(
s.notaryHandler,
),
AuditSettlementsHandler: s.onlyAlphabetEventHandler(
settlementProcessor.HandleAuditEvent,
),
AlphabetSyncHandler: s.onlyAlphabetEventHandler(
alphaSync,
),
@ -171,93 +156,6 @@ func (s *Server) initNotaryConfig() {
)
}
func (s *Server) createAuditProcessor(cfg *viper.Viper, clientCache *ClientCache, cnrClient *container.Client) (*audit.Processor, error) {
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
if err != nil {
return nil, err
}
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
porPoolSize := cfg.GetInt("audit.por.pool_size")
// create audit processor dependencies
auditTaskManager := audittask.New(
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
audittask.WithWorkerPool(auditPool),
audittask.WithLogger(s.log),
audittask.WithContainerCommunicator(clientCache),
audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")),
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(pdpPoolSize)
}),
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(porPoolSize)
}),
)
s.workers = append(s.workers, auditTaskManager.Listen)
// create audit processor
return audit.New(&audit.Params{
Log: s.log,
NetmapClient: s.netmapClient,
ContainerClient: cnrClient,
IRList: s,
EpochSource: s,
SGSource: clientCache,
Key: &s.key.PrivateKey,
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
TaskManager: auditTaskManager,
Reporter: s,
})
}
func (s *Server) createSettlementProcessor(clientCache *ClientCache, cnrClient *container.Client) *settlement.Processor {
// create settlement processor dependencies
settlementDeps := settlementDeps{
log: s.log,
cnrSrc: container.AsContainerSource(cnrClient),
auditClient: s.auditClient,
nmClient: s.netmapClient,
clientCache: clientCache,
balanceClient: s.balanceClient,
}
settlementDeps.settlementCtx = auditSettlementContext
auditCalcDeps := &auditSettlementDeps{
settlementDeps: settlementDeps,
}
settlementDeps.settlementCtx = basicIncomeSettlementContext
basicSettlementDeps := &basicIncomeSettlementDeps{
settlementDeps: settlementDeps,
cnrClient: cnrClient,
}
auditSettlementCalc := auditSettlement.NewCalculator(
&auditSettlement.CalculatorPrm{
ResultStorage: auditCalcDeps,
ContainerStorage: auditCalcDeps,
PlacementCalculator: auditCalcDeps,
SGStorage: auditCalcDeps,
AccountStorage: auditCalcDeps,
Exchanger: auditCalcDeps,
AuditFeeFetcher: s.netmapClient,
},
auditSettlement.WithLogger(s.log),
)
// create settlement processor
return settlement.New(
settlement.Prm{
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps},
State: s,
},
settlement.WithLogger(s.log),
)
}
func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) {
var alphaSync event.Handler
@ -316,16 +214,6 @@ func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morp
epoch: s,
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
collectBasicIncome: subEpochEventHandler{
handler: processors.SettlementProcessor.HandleIncomeCollectionEvent,
durationMul: cfg.GetUint32("timers.collect_basic_income.mul"),
durationDiv: cfg.GetUint32("timers.collect_basic_income.div"),
},
distributeBasicIncome: subEpochEventHandler{
handler: processors.SettlementProcessor.HandleIncomeDistributionEvent,
durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"),
durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"),
},
})
s.addBlockTimer(s.epochTimer)
@ -493,12 +381,6 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
var err error
fee := s.feeConfig.SideChainFee()
// do not use TryNotary() in audit wrapper
// audit operations do not require multisignatures
s.auditClient, err = auditClient.NewFromMorph(s.morphClient, s.contracts.audit, fee)
if err != nil {
return nil, err
}
// form morph container client's options
morphCnrOpts := make([]container.Option, 0, 3)
@ -545,8 +427,7 @@ func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
}
type serverProcessors struct {
AlphabetProcessor *alphabet.Processor
SettlementProcessor *settlement.Processor
AlphabetProcessor *alphabet.Processor
}
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
@ -561,32 +442,12 @@ func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClien
cfg.GetDuration("indexer.cache_timeout"),
)
clientCache := newClientCache(&clientCacheParams{
Log: s.log,
Key: &s.key.PrivateKey,
SGTimeout: cfg.GetDuration("audit.timeout.get"),
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
AllowExternal: cfg.GetBool("audit.allow_external"),
})
s.registerNoErrCloser(clientCache.cache.CloseAll)
// create audit processor
auditProcessor, err := s.createAuditProcessor(cfg, clientCache, morphClients.CnrClient)
alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
if err != nil {
return nil, err
}
result.SettlementProcessor = s.createSettlementProcessor(clientCache, morphClients.CnrClient)
var alphaSync event.Handler
alphaSync, err = s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
if err != nil {
return nil, err
}
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync, auditProcessor, result.SettlementProcessor)
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync)
if err != nil {
return nil, err
}

View File

@ -13,7 +13,6 @@ import (
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
@ -52,7 +51,6 @@ type (
epochDuration atomic.Uint64
statusIndex *innerRingIndexer
precision precision.Fixed8Converter
auditClient *auditClient.Client
healthStatus atomic.Value
balanceClient *balanceClient.Client
netmapClient *nmClient.Client
@ -572,16 +570,6 @@ func (s *Server) nextEpochBlockDelta() (uint32, error) {
return delta - blockHeight, nil
}
// onlyActiveHandler wrapper around event handler that executes it
// only if inner ring node state is active.
func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) {
if s.IsActive() {
f(ev)
}
}
}
// onlyAlphabet wrapper around event handler that executes it
// only if inner ring node is alphabet node.
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {

View File

@ -1,339 +0,0 @@
package frostfsapiclient
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/storagegroup"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
// Client represents FrostFS API client cut down to the needs of a purely IR application.
type Client struct {
key *ecdsa.PrivateKey
c clientcore.Client
}
// WrapBasicClient wraps a client.Client instance to use it for FrostFS API RPC.
func (x *Client) WrapBasicClient(c clientcore.Client) {
x.c = c
}
// SetPrivateKey sets a private key to sign RPC requests.
func (x *Client) SetPrivateKey(key *ecdsa.PrivateKey) {
x.key = key
}
// SearchSGPrm groups parameters of SearchSG operation.
type SearchSGPrm struct {
cnrID cid.ID
}
// SetContainerID sets the ID of the container to search for storage groups.
func (x *SearchSGPrm) SetContainerID(id cid.ID) {
x.cnrID = id
}
// SearchSGRes groups the resulting values of SearchSG operation.
type SearchSGRes struct {
cliRes []oid.ID
}
// IDList returns a list of IDs of storage groups in the container.
func (x SearchSGRes) IDList() []oid.ID {
return x.cliRes
}
var sgFilter = storagegroup.SearchQuery()
// SearchSG lists objects of storage group type in the container.
//
// Returns any error which prevented the operation from completing correctly in error return.
func (x Client) SearchSG(ctx context.Context, prm SearchSGPrm) (*SearchSGRes, error) {
var cliPrm client.PrmObjectSearch
cliPrm.InContainer(prm.cnrID)
cliPrm.SetFilters(sgFilter)
cliPrm.UseKey(*x.key)
rdr, err := x.c.ObjectSearchInit(ctx, cliPrm)
if err != nil {
return nil, fmt.Errorf("init object search: %w", err)
}
buf := make([]oid.ID, 10)
var list []oid.ID
var n int
var ok bool
for {
n, ok = rdr.Read(buf)
for i := 0; i < n; i++ {
list = append(list, buf[i])
}
if !ok {
break
}
}
res, err := rdr.Close()
if err == nil {
// pull out an error from status
err = apistatus.ErrFromStatus(res.Status())
}
if err != nil {
return nil, fmt.Errorf("read object list: %w", err)
}
return &SearchSGRes{
cliRes: list,
}, nil
}
// GetObjectPrm groups parameters of GetObject operation.
type GetObjectPrm struct {
getObjectPrm
}
// GetObjectRes groups the resulting values of GetObject operation.
type GetObjectRes struct {
obj *object.Object
}
// Object returns the received object.
func (x GetObjectRes) Object() *object.Object {
return x.obj
}
// GetObject reads the object by address.
//
// Returns any error which prevented the operation from completing correctly in error return.
func (x Client) GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) {
var cliPrm client.PrmObjectGet
cliPrm.FromContainer(prm.objAddr.Container())
cliPrm.ByID(prm.objAddr.Object())
cliPrm.UseKey(*x.key)
rdr, err := x.c.ObjectGetInit(ctx, cliPrm)
if err != nil {
return nil, fmt.Errorf("init object search: %w", err)
}
var obj object.Object
if !rdr.ReadHeader(&obj) {
res, err := rdr.Close()
if err == nil {
// pull out an error from status
err = apistatus.ErrFromStatus(res.Status())
}
return nil, fmt.Errorf("read object header: %w", err)
}
buf := make([]byte, obj.PayloadSize())
_, err = rdr.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("read payload: %w", err)
}
obj.SetPayload(buf)
return &GetObjectRes{
obj: &obj,
}, nil
}
// HeadObjectPrm groups parameters of HeadObject operation.
type HeadObjectPrm struct {
getObjectPrm
raw bool
local bool
}
// SetRawFlag sets flag of raw request.
func (x *HeadObjectPrm) SetRawFlag() {
x.raw = true
}
// SetTTL sets request TTL value.
func (x *HeadObjectPrm) SetTTL(ttl uint32) {
x.local = ttl < 2
}
// HeadObjectRes groups the resulting values of HeadObject operation.
type HeadObjectRes struct {
hdr *object.Object
}
// Header returns the received object header.
func (x HeadObjectRes) Header() *object.Object {
return x.hdr
}
// HeadObject reads short object header by address.
//
// Returns any error which prevented the operation from completing correctly in error return.
// For raw requests, returns *object.SplitInfoError error if the requested object is virtual.
func (x Client) HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error) {
var cliPrm client.PrmObjectHead
if prm.raw {
cliPrm.MarkRaw()
}
if prm.local {
cliPrm.MarkLocal()
}
cliPrm.FromContainer(prm.objAddr.Container())
cliPrm.ByID(prm.objAddr.Object())
cliPrm.UseKey(*x.key)
cliRes, err := x.c.ObjectHead(ctx, cliPrm)
if err == nil {
// pull out an error from status
err = apistatus.ErrFromStatus(cliRes.Status())
}
if err != nil {
return nil, fmt.Errorf("read object header from FrostFS: %w", err)
}
var hdr object.Object
if !cliRes.ReadHeader(&hdr) {
return nil, errors.New("missing object header in the response")
}
return &HeadObjectRes{
hdr: &hdr,
}, nil
}
// GetObjectPayload reads an object by address from FrostFS via Client and returns its payload.
//
// Returns any error which prevented the operation from completing correctly in error return.
func GetObjectPayload(ctx context.Context, c Client, addr oid.Address) ([]byte, error) {
var prm GetObjectPrm
prm.SetAddress(addr)
obj, err := c.GetObject(ctx, prm)
if err != nil {
return nil, err
}
return obj.Object().Payload(), nil
}
func headObject(ctx context.Context, c Client, addr oid.Address, raw bool, ttl uint32) (*object.Object, error) {
var prm HeadObjectPrm
prm.SetAddress(addr)
prm.SetTTL(ttl)
if raw {
prm.SetRawFlag()
}
obj, err := c.HeadObject(ctx, prm)
if err != nil {
return nil, err
}
return obj.Header(), nil
}
// GetRawObjectHeaderLocally reads the raw short object header from the server's local storage by address via Client.
func GetRawObjectHeaderLocally(ctx context.Context, c Client, addr oid.Address) (*object.Object, error) {
return headObject(ctx, c, addr, true, 1)
}
// GetObjectHeaderFromContainer reads the short object header by address via Client with TTL = 10
// for deep traversal of the container.
func GetObjectHeaderFromContainer(ctx context.Context, c Client, addr oid.Address) (*object.Object, error) {
return headObject(ctx, c, addr, false, 10)
}
// HashPayloadRangePrm groups parameters of HashPayloadRange operation.
type HashPayloadRangePrm struct {
getObjectPrm
rng *object.Range
}
// SetRange sets payload range to calculate the hash.
func (x *HashPayloadRangePrm) SetRange(rng *object.Range) {
x.rng = rng
}
// HashPayloadRangeRes groups the resulting values of HashPayloadRange operation.
type HashPayloadRangeRes struct {
h []byte
}
// Hash returns the hash of the object payload range.
func (x HashPayloadRangeRes) Hash() []byte {
return x.h
}
// HashPayloadRange requests to calculate Tillich-Zemor hash of the payload range of the object
// from the remote server's local storage.
//
// Returns any error which prevented the operation from completing correctly in error return.
func (x Client) HashPayloadRange(ctx context.Context, prm HashPayloadRangePrm) (res HashPayloadRangeRes, err error) {
var cliPrm client.PrmObjectHash
cliPrm.FromContainer(prm.objAddr.Container())
cliPrm.ByID(prm.objAddr.Object())
cliPrm.SetRangeList(prm.rng.GetOffset(), prm.rng.GetLength())
cliPrm.TillichZemorAlgo()
cliRes, err := x.c.ObjectHash(ctx, cliPrm)
if err == nil {
// pull out an error from status
err = apistatus.ErrFromStatus(cliRes.Status())
if err != nil {
return
}
hs := cliRes.Checksums()
if ln := len(hs); ln != 1 {
err = fmt.Errorf("wrong number of checksums %d", ln)
} else {
res.h = hs[0]
}
}
return
}
// HashObjectRange reads Tillich-Zemor hash of the object payload range by address
// from the remote server's local storage via Client.
//
// Returns any error which prevented the operation from completing correctly in error return.
func HashObjectRange(ctx context.Context, c Client, addr oid.Address, rng *object.Range) ([]byte, error) {
var prm HashPayloadRangePrm
prm.SetAddress(addr)
prm.SetRange(rng)
res, err := c.HashPayloadRange(ctx, prm)
if err != nil {
return nil, err
}
return res.Hash(), nil
}

View File

@ -1,12 +0,0 @@
// Package frostfsapiclient provides functionality for IR application communication with FrostFS network.
//
// The basic client for accessing remote nodes via FrostFS API is a FrostFS SDK Go API client.
// However, although it encapsulates a useful piece of business logic (e.g. the signature mechanism),
// the IR application does not fully use the client's flexible interface.
//
// In this regard, this package represents an abstraction -- a type-wrapper over the base client.
// The type provides the minimum interface necessary for the application and also allows you to concentrate
// the entire spectrum of the client's use in one place (this will be convenient both when updating the base client
// and for evaluating the UX of SDK library). So, it is expected that all application packages will be limited
// to this package for the development of functionality requiring FrostFS API communication.
package frostfsapiclient

View File

@ -1,18 +0,0 @@
package frostfsapiclient
import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type objectAddressPrm struct {
objAddr oid.Address
}
// SetAddress sets address of the object.
func (x *objectAddressPrm) SetAddress(addr oid.Address) {
x.objAddr = addr
}
type getObjectPrm struct {
objectAddressPrm
}

View File

@ -1,19 +0,0 @@
package audit
// Start is an event to start a new round of data audit.
type Start struct {
epoch uint64
}
// MorphEvent implements the Event interface.
func (a Start) MorphEvent() {}
func NewAuditStartEvent(epoch uint64) Start {
return Start{
epoch: epoch,
}
}
func (a Start) Epoch() uint64 {
return a.epoch
}

View File

@ -1,22 +0,0 @@
package audit
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"go.uber.org/zap"
)
func (ap *Processor) handleNewAuditRound(ev event.Event) {
auditEvent := ev.(Start)
epoch := auditEvent.Epoch()
ap.log.Info(logs.AuditNewRoundOfAudit, zap.Uint64("epoch", epoch))
// send an event to the worker pool
err := ap.pool.Submit(func() { ap.processStartAudit(epoch) })
if err != nil {
ap.log.Warn(logs.AuditPreviousRoundOfAuditPrepareHasntFinishedYet)
}
}

View File

@ -1,217 +0,0 @@
package audit
import (
"context"
"crypto/sha256"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
func (ap *Processor) processStartAudit(epoch uint64) {
log := ap.log.With(zap.Uint64("epoch", epoch))
ap.prevAuditCanceler()
skipped := ap.taskManager.Reset()
if skipped > 0 {
ap.log.Info(logs.AuditSomeTasksFromPreviousEpochAreSkipped,
zap.Int("amount", skipped),
)
}
containers, err := ap.selectContainersToAudit(epoch)
if err != nil {
log.Error(logs.AuditContainerSelectionFailure, zap.String("error", err.Error()))
return
}
log.Info(logs.AuditSelectContainersForAudit, zap.Int("amount", len(containers)))
nm, err := ap.netmapClient.GetNetMap(0)
if err != nil {
ap.log.Error(logs.AuditCantFetchNetworkMap,
zap.String("error", err.Error()))
return
}
cancelChannel := make(chan struct{})
ap.prevAuditCanceler = func() {
select {
case <-cancelChannel: // already closed
default:
close(cancelChannel)
}
}
pivot := make([]byte, sha256.Size)
ap.startAuditTasksOnContainers(cancelChannel, containers, log, pivot, nm, epoch)
}
func (ap *Processor) startAuditTasksOnContainers(cancelChannel <-chan struct{}, containers []cid.ID, log *zap.Logger, pivot []byte, nm *netmap.NetMap, epoch uint64) {
for i := range containers {
cnr, err := cntClient.Get(ap.containerClient, containers[i]) // get container structure
if err != nil {
log.Error(logs.AuditCantGetContainerInfoIgnore,
zap.Stringer("cid", containers[i]),
zap.String("error", err.Error()))
continue
}
containers[i].Encode(pivot)
// find all container nodes for current epoch
nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), pivot)
if err != nil {
log.Info(logs.AuditCantBuildPlacementForContainerIgnore,
zap.Stringer("cid", containers[i]),
zap.String("error", err.Error()))
continue
}
n := placement.FlattenNodes(nodes)
// shuffle nodes to ask a random one
rand.Shuffle(len(n), func(i, j int) {
n[i], n[j] = n[j], n[i]
})
// search storage groups
storageGroupsIDs := ap.findStorageGroups(containers[i], n)
log.Info(logs.AuditSelectStorageGroupsForAudit,
zap.Stringer("cid", containers[i]),
zap.Int("amount", len(storageGroupsIDs)))
// filter expired storage groups
storageGroups := ap.filterExpiredSG(containers[i], storageGroupsIDs, nodes, *nm)
log.Info(logs.AuditFilterExpiredStorageGroupsForAudit,
zap.Stringer("cid", containers[i]),
zap.Int("amount", len(storageGroups)))
// skip audit for containers without
// non-expired storage groups
if len(storageGroupsIDs) == 0 {
continue
}
auditTask := new(audit.Task).
WithReporter(&epochAuditReporter{
epoch: epoch,
rep: ap.reporter,
}).
WithCancelChannel(cancelChannel).
WithContainerID(containers[i]).
WithStorageGroupList(storageGroups).
WithContainerStructure(cnr.Value).
WithContainerNodes(nodes).
WithNetworkMap(nm)
ap.taskManager.PushTask(auditTask)
}
}
func (ap *Processor) findStorageGroups(cnr cid.ID, shuffled netmapcore.Nodes) []oid.ID {
var sg []oid.ID
ln := len(shuffled)
var (
info clientcore.NodeInfo
prm storagegroup.SearchSGPrm
)
prm.Container = cnr
for i := range shuffled { // consider iterating over some part of container
log := ap.log.With(
zap.Stringer("cid", cnr),
zap.String("key", netmap.StringifyPublicKey(shuffled[0])),
zap.Int("try", i),
zap.Int("total_tries", ln),
)
err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(shuffled[i]))
if err != nil {
log.Warn(logs.AuditParseClientNodeInfo, zap.String("error", err.Error()))
continue
}
ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout)
prm.NodeInfo = info
var dst storagegroup.SearchSGDst
err = ap.sgSrc.ListSG(ctx, &dst, prm)
cancel()
if err != nil {
log.Warn(logs.AuditErrorInStorageGroupSearch, zap.String("error", err.Error()))
continue
}
sg = append(sg, dst.Objects...)
break // we found storage groups, so break loop
}
return sg
}
func (ap *Processor) filterExpiredSG(cid cid.ID, sgIDs []oid.ID,
cnr [][]netmap.NodeInfo, nm netmap.NetMap) []storagegroup.StorageGroup {
sgs := make([]storagegroup.StorageGroup, 0, len(sgIDs))
var coreSG storagegroup.StorageGroup
var getSGPrm storagegroup.GetSGPrm
getSGPrm.CID = cid
getSGPrm.Container = cnr
getSGPrm.NetMap = nm
for _, sgID := range sgIDs {
ctx, cancel := context.WithTimeout(context.Background(), ap.searchTimeout)
getSGPrm.OID = sgID
sg, err := ap.sgSrc.GetSG(ctx, getSGPrm)
cancel()
if err != nil {
ap.log.Error(
"could not get storage group object for audit, skipping",
zap.Stringer("cid", cid),
zap.Stringer("oid", sgID),
zap.Error(err),
)
continue
}
// filter expired epochs
if sg.ExpirationEpoch() >= ap.epochSrc.EpochCounter() {
coreSG.SetID(sgID)
coreSG.SetStorageGroup(*sg)
sgs = append(sgs, coreSG)
}
}
return sgs
}

View File

@ -1,143 +0,0 @@
package audit
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/panjf2000/ants/v2"
)
type (
// Indexer is a callback interface for inner ring global state.
Indexer interface {
InnerRingIndex() int
InnerRingSize() int
}
TaskManager interface {
PushTask(*audit.Task)
// Must skip all tasks planned for execution and
// return their number.
Reset() int
}
// EpochSource is an interface that provides actual
// epoch information.
EpochSource interface {
// EpochCounter must return current epoch number.
EpochCounter() uint64
}
// Processor of events related to data audit.
Processor struct {
log *logger.Logger
pool *ants.Pool
irList Indexer
sgSrc storagegroup.SGSource
epochSrc EpochSource
searchTimeout time.Duration
containerClient *cntClient.Client
netmapClient *nmClient.Client
taskManager TaskManager
reporter audit.Reporter
prevAuditCanceler context.CancelFunc
}
// Params of the processor constructor.
Params struct {
Log *logger.Logger
NetmapClient *nmClient.Client
ContainerClient *cntClient.Client
IRList Indexer
SGSource storagegroup.SGSource
RPCSearchTimeout time.Duration
TaskManager TaskManager
Reporter audit.Reporter
Key *ecdsa.PrivateKey
EpochSource EpochSource
}
)
type epochAuditReporter struct {
epoch uint64
rep audit.Reporter
}
// ProcessorPoolSize limits pool size for audit Processor. Processor manages
// audit tasks and fills queue for the next epoch. This process must not be interrupted
// by a new audit epoch, so we limit the pool size for the processor to one.
const ProcessorPoolSize = 1
// New creates audit processor instance.
func New(p *Params) (*Processor, error) {
switch {
case p.Log == nil:
return nil, errors.New("ir/audit: logger is not set")
case p.IRList == nil:
return nil, errors.New("ir/audit: global state is not set")
case p.SGSource == nil:
return nil, errors.New("ir/audit: SG source is not set")
case p.TaskManager == nil:
return nil, errors.New("ir/audit: audit task manager is not set")
case p.Reporter == nil:
return nil, errors.New("ir/audit: audit result reporter is not set")
case p.Key == nil:
return nil, errors.New("ir/audit: signing key is not set")
case p.EpochSource == nil:
return nil, errors.New("ir/audit: epoch source is not set")
}
pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("ir/audit: can't create worker pool: %w", err)
}
return &Processor{
log: p.Log,
pool: pool,
containerClient: p.ContainerClient,
irList: p.IRList,
sgSrc: p.SGSource,
epochSrc: p.EpochSource,
searchTimeout: p.RPCSearchTimeout,
netmapClient: p.NetmapClient,
taskManager: p.TaskManager,
reporter: p.Reporter,
prevAuditCanceler: func() {},
}, nil
}
// ListenerNotificationParsers for the 'event.Listener' event producer.
func (ap *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
return nil
}
// ListenerNotificationHandlers for the 'event.Listener' event producer.
func (ap *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
return nil
}
// StartAuditHandler for the internal event producer.
func (ap *Processor) StartAuditHandler() event.Handler {
return ap.handleNewAuditRound
}
func (r *epochAuditReporter) WriteReport(rep *audit.Report) error {
res := rep.Result()
res.ForEpoch(r.epoch)
return r.rep.WriteReport(rep)
}

View File

@ -1,65 +0,0 @@
package audit
import (
"errors"
"fmt"
"sort"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
var ErrInvalidIRNode = errors.New("node is not in the inner ring list")
func (ap *Processor) selectContainersToAudit(epoch uint64) ([]cid.ID, error) {
containers, err := ap.containerClient.ContainersOf(nil)
if err != nil {
return nil, fmt.Errorf("can't get list of containers to start audit: %w", err)
}
// consider getting extra information about container complexity from
// audit contract there
ap.log.Debug(logs.AuditContainerListingFinished,
zap.Int("total amount", len(containers)),
)
sort.Slice(containers, func(i, j int) bool {
return strings.Compare(containers[i].EncodeToString(), containers[j].EncodeToString()) < 0
})
ind := ap.irList.InnerRingIndex()
irSize := ap.irList.InnerRingSize()
if ind < 0 || ind >= irSize {
return nil, ErrInvalidIRNode
}
return Select(containers, epoch, uint64(ind), uint64(irSize)), nil
}
func Select(ids []cid.ID, epoch, index, size uint64) []cid.ID {
if index >= size {
return nil
}
var a, b uint64
ln := uint64(len(ids))
pivot := ln % size
delta := ln / size
index = (index + epoch) % size
if index < pivot {
a = delta + 1
} else {
a = delta
b = pivot
}
from := a*index + b
to := a*(index+1) + b
return ids[from:to]
}

View File

@ -1,106 +0,0 @@
package audit_test
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
)
func TestSelect(t *testing.T) {
cids := generateContainers(10)
t.Run("invalid input", func(t *testing.T) {
require.Empty(t, audit.Select(cids, 0, 0, 0))
})
t.Run("even split", func(t *testing.T) {
const irSize = 5 // every node takes two audit nodes
m := hitMap(cids)
for i := 0; i < irSize; i++ {
s := audit.Select(cids, 0, uint64(i), irSize)
require.Equal(t, len(cids)/irSize, len(s))
for _, id := range s {
n, ok := m[id.EncodeToString()]
require.True(t, ok)
require.Equal(t, 0, n)
m[id.EncodeToString()] = 1
}
}
require.True(t, allHit(m))
})
t.Run("odd split", func(t *testing.T) {
const irSize = 3
m := hitMap(cids)
for i := 0; i < irSize; i++ {
s := audit.Select(cids, 0, uint64(i), irSize)
for _, id := range s {
n, ok := m[id.EncodeToString()]
require.True(t, ok)
require.Equal(t, 0, n)
m[id.EncodeToString()] = 1
}
}
require.True(t, allHit(m))
})
t.Run("epoch shift", func(t *testing.T) {
const irSize = 4
m := hitMap(cids)
for i := 0; i < irSize; i++ {
s := audit.Select(cids, uint64(i), 0, irSize)
for _, id := range s {
n, ok := m[id.EncodeToString()]
require.True(t, ok)
require.Equal(t, 0, n)
m[id.EncodeToString()] = 1
}
}
require.True(t, allHit(m))
})
}
func generateContainers(n int) []cid.ID {
result := make([]cid.ID, n)
for i := 0; i < n; i++ {
result[i] = cidtest.ID()
}
return result
}
func hitMap(ids []cid.ID) map[string]int {
result := make(map[string]int, len(ids))
for _, id := range ids {
result[id.EncodeToString()] = 0
}
return result
}
func allHit(m map[string]int) bool {
for _, v := range m {
if v == 0 {
return false
}
}
return true
}

View File

@ -2,9 +2,7 @@ package netmap
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
"go.uber.org/zap"
@ -63,8 +61,6 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.netmapSnapshot.update(*networkMap, epoch)
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleNewAudit(audit.NewAuditStartEvent(epoch))
np.handleAuditSettlements(settlement.NewAuditEvent(epoch))
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ev)
}

View File

@ -65,10 +65,8 @@ type (
netmapSnapshot cleanupTable
handleNewAudit event.Handler
handleAuditSettlements event.Handler
handleAlphabetSync event.Handler
handleNotaryDeposit event.Handler
handleAlphabetSync event.Handler
handleNotaryDeposit event.Handler
nodeValidator NodeValidator
@ -89,10 +87,8 @@ type (
CleanupThreshold uint64 // in epochs
ContainerWrapper *container.Client
HandleAudit event.Handler
AuditSettlementsHandler event.Handler
AlphabetSyncHandler event.Handler
NotaryDepositHandler event.Handler
AlphabetSyncHandler event.Handler
NotaryDepositHandler event.Handler
NodeValidator NodeValidator
@ -119,10 +115,6 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/netmap: global state is not set")
case p.AlphabetState == nil:
return nil, errors.New("ir/netmap: global state is not set")
case p.HandleAudit == nil:
return nil, errors.New("ir/netmap: audit handler is not set")
case p.AuditSettlementsHandler == nil:
return nil, errors.New("ir/netmap: audit settlement handler is not set")
case p.AlphabetSyncHandler == nil:
return nil, errors.New("ir/netmap: alphabet sync handler is not set")
case p.NotaryDepositHandler == nil:
@ -151,9 +143,6 @@ func New(p *Params) (*Processor, error) {
netmapClient: p.NetmapClient,
containerWrp: p.ContainerWrapper,
netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold),
handleNewAudit: p.HandleAudit,
handleAuditSettlements: p.AuditSettlementsHandler,
handleAlphabetSync: p.AlphabetSyncHandler,

View File

@ -1,336 +0,0 @@
package audit
import (
"bytes"
"crypto/ecdsa"
"crypto/elliptic"
"encoding/hex"
"math/big"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/audit"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
)
// CalculatePrm groups the required parameters of
// Calculator.CalculateForEpoch call.
type CalculatePrm struct {
// Number of epoch to perform the calculation.
Epoch uint64
}
type singleResultCtx struct {
eAudit uint64
auditResult *audit.Result
log *logger.Logger
txTable *common.TransferTable
cnrInfo common.ContainerInfo
cnrNodes []common.NodeInfo
passNodes map[string]common.NodeInfo
sumSGSize *big.Int
auditFee *big.Int
}
var (
bigGB = big.NewInt(1 << 30)
bigZero = big.NewInt(0)
bigOne = big.NewInt(1)
)
// Calculate calculates payments for audit results in a specific epoch of the network.
// Wraps the results in a money transfer transaction and sends it to the network.
func (c *Calculator) Calculate(p *CalculatePrm) {
log := &logger.Logger{Logger: c.opts.log.With(
zap.Uint64("current epoch", p.Epoch),
)}
if p.Epoch == 0 {
log.Info(logs.AuditSettlementsAreIgnoredForZeroEpoch)
return
}
log.Info(logs.AuditCalculateAuditSettlements)
log.Debug(logs.AuditGettingResultsForThePreviousEpoch)
prevEpoch := p.Epoch - 1
auditResults, err := c.prm.ResultStorage.AuditResultsForEpoch(prevEpoch)
if err != nil {
log.Error(logs.AuditCouldNotCollectAuditResults)
return
} else if len(auditResults) == 0 {
log.Debug(logs.AuditNoAuditResultsInPreviousEpoch)
return
}
auditFee, err := c.prm.AuditFeeFetcher.AuditFee()
if err != nil {
log.Warn(logs.AuditCantFetchAuditFeeFromNetworkConfig,
zap.String("error", err.Error()))
auditFee = 0
}
log.Debug(logs.AuditProcessingAuditResults,
zap.Int("number", len(auditResults)),
)
table := common.NewTransferTable()
for i := range auditResults {
c.processResult(&singleResultCtx{
log: log,
auditResult: auditResults[i],
txTable: table,
auditFee: big.NewInt(0).SetUint64(auditFee),
})
}
log.Debug(logs.AuditProcessingTransfers)
common.TransferAssets(c.prm.Exchanger, table, common.AuditSettlementDetails(prevEpoch))
}
func (c *Calculator) processResult(ctx *singleResultCtx) {
ctx.log = &logger.Logger{Logger: ctx.log.With(
zap.Stringer("cid", ctx.containerID()),
zap.Uint64("audit epoch", ctx.auditResult.Epoch()),
)}
ctx.log.Debug(logs.AuditReadingInformationAboutTheContainer)
ok := c.readContainerInfo(ctx)
if !ok {
return
}
ctx.log.Debug(logs.AuditBuildingPlacement)
ok = c.buildPlacement(ctx)
if !ok {
return
}
ctx.log.Debug(logs.AuditCollectingPassedNodes)
ok = c.collectPassNodes(ctx)
if !ok {
return
}
ctx.log.Debug(logs.AuditCalculatingSumOfTheSizesOfAllStorageGroups)
ok = c.sumSGSizes(ctx)
if !ok {
return
}
ctx.log.Debug(logs.AuditFillingTransferTable)
c.fillTransferTable(ctx)
}
func (c *Calculator) readContainerInfo(ctx *singleResultCtx) bool {
cnr, ok := ctx.auditResult.Container()
if !ok {
ctx.log.Error(logs.AuditMissingContainerInAuditResult)
return false
}
var err error
ctx.cnrInfo, err = c.prm.ContainerStorage.ContainerInfo(cnr)
if err != nil {
ctx.log.Error(logs.AuditCouldNotGetContainerInfo,
zap.String("error", err.Error()),
)
}
return err == nil
}
func (c *Calculator) buildPlacement(ctx *singleResultCtx) bool {
var err error
ctx.cnrNodes, err = c.prm.PlacementCalculator.ContainerNodes(ctx.auditEpoch(), ctx.containerID())
if err != nil {
ctx.log.Error(logs.AuditCouldNotGetContainerNodes,
zap.String("error", err.Error()),
)
}
empty := len(ctx.cnrNodes) == 0
if empty {
ctx.log.Debug(logs.AuditEmptyListOfContainerNodes)
}
return err == nil && !empty
}
func (c *Calculator) collectPassNodes(ctx *singleResultCtx) bool {
ctx.passNodes = make(map[string]common.NodeInfo)
for _, cnrNode := range ctx.cnrNodes {
// TODO(@cthulhu-rider): neofs-sdk-go#241 use dedicated method
ctx.auditResult.IteratePassedStorageNodes(func(passNode []byte) bool {
if !bytes.Equal(cnrNode.PublicKey(), passNode) {
return true
}
failed := false
ctx.auditResult.IterateFailedStorageNodes(func(failNode []byte) bool {
failed = bytes.Equal(cnrNode.PublicKey(), failNode)
return !failed
})
if !failed {
ctx.passNodes[hex.EncodeToString(passNode)] = cnrNode
}
return false
})
}
empty := len(ctx.passNodes) == 0
if empty {
ctx.log.Debug(logs.AuditNoneOfTheContainerNodesPassedTheAudit)
}
return !empty
}
func (c *Calculator) sumSGSizes(ctx *singleResultCtx) bool {
sumPassSGSize := uint64(0)
fail := false
var addr oid.Address
addr.SetContainer(ctx.containerID())
ctx.auditResult.IteratePassedStorageGroups(func(id oid.ID) bool {
addr.SetObject(id)
sgInfo, err := c.prm.SGStorage.SGInfo(addr)
if err != nil {
ctx.log.Error(logs.AuditCouldNotGetSGInfo,
zap.String("id", id.String()),
zap.String("error", err.Error()),
)
fail = true
return false // we also can continue and calculate at least some part
}
sumPassSGSize += sgInfo.Size()
return true
})
if fail {
return false
}
if sumPassSGSize == 0 {
ctx.log.Debug(logs.AuditZeroSumSGSize)
return false
}
ctx.sumSGSize = big.NewInt(int64(sumPassSGSize))
return true
}
func (c *Calculator) fillTransferTable(ctx *singleResultCtx) bool {
cnrOwner := ctx.cnrInfo.Owner()
// add txs to pay for storage node
for k, info := range ctx.passNodes {
ownerID, err := c.prm.AccountStorage.ResolveKey(info)
if err != nil {
ctx.log.Error(logs.AuditCouldNotResolvePublicKeyOfTheStorageNode,
zap.String("error", err.Error()),
zap.String("key", k),
)
return false // we also can continue and calculate at least some part
}
price := info.Price()
ctx.log.Debug(logs.AuditCalculatingStorageNodeSalaryForAudit,
zap.Stringer("sum SG size", ctx.sumSGSize),
zap.Stringer("price", price),
)
fee := big.NewInt(0).Mul(price, ctx.sumSGSize)
fee.Div(fee, bigGB)
if fee.Cmp(bigZero) == 0 {
fee.Add(fee, bigOne)
}
ctx.txTable.Transfer(&common.TransferTx{
From: cnrOwner,
To: *ownerID,
Amount: fee,
})
}
// add txs to pay inner ring node for audit result
auditIR, err := ownerFromKey(ctx.auditResult.AuditorKey())
if err != nil {
ctx.log.Error(logs.AuditCouldNotParsePublicKeyOfTheInnerRingNode,
zap.String("error", err.Error()),
zap.String("key", hex.EncodeToString(ctx.auditResult.AuditorKey())),
)
return false
}
ctx.txTable.Transfer(&common.TransferTx{
From: cnrOwner,
To: *auditIR,
Amount: ctx.auditFee,
})
return false
}
func (c *singleResultCtx) containerID() cid.ID {
cnr, _ := c.auditResult.Container()
return cnr
}
func (c *singleResultCtx) auditEpoch() uint64 {
if c.eAudit == 0 {
c.eAudit = c.auditResult.Epoch()
}
return c.eAudit
}
func ownerFromKey(key []byte) (*user.ID, error) {
pubKey, err := keys.NewPublicKeyFromBytes(key, elliptic.P256())
if err != nil {
return nil, err
}
var id user.ID
user.IDFromKey(&id, (ecdsa.PublicKey)(*pubKey))
return &id, nil
}

View File

@ -1,48 +0,0 @@
package audit
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Calculator represents a component for calculating payments
// based on data audit results and sending remittances to the chain.
type Calculator struct {
prm *CalculatorPrm
opts *options
}
// CalculatorOption is a Calculator constructor's option.
type CalculatorOption func(*options)
type options struct {
log *logger.Logger
}
func defaultOptions() *options {
return &options{
log: &logger.Logger{Logger: zap.L()},
}
}
// NewCalculator creates, initializes and returns a new Calculator instance.
func NewCalculator(p *CalculatorPrm, opts ...CalculatorOption) *Calculator {
o := defaultOptions()
for i := range opts {
opts[i](o)
}
return &Calculator{
prm: p,
opts: o,
}
}
// WithLogger returns an option to specify the logging component.
func WithLogger(l *logger.Logger) CalculatorOption {
return func(o *options) {
o.log = l
}
}

View File

@ -1,49 +0,0 @@
package audit
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/audit"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
// CalculatorPrm groups the parameters of Calculator's constructor.
type CalculatorPrm struct {
ResultStorage ResultStorage
ContainerStorage common.ContainerStorage
PlacementCalculator common.PlacementCalculator
SGStorage SGStorage
AccountStorage common.AccountStorage
Exchanger common.Exchanger
AuditFeeFetcher FeeFetcher
}
// ResultStorage is an interface of storage of the audit results.
type ResultStorage interface {
// Must return all audit results by epoch number.
AuditResultsForEpoch(epoch uint64) ([]*audit.Result, error)
}
// SGInfo groups the data about FrostFS storage group
// necessary for calculating audit fee.
type SGInfo interface {
// Must return sum size of the all group members.
Size() uint64
}
// SGStorage is an interface of storage of the storage groups.
type SGStorage interface {
// Must return information about the storage group by address.
SGInfo(oid.Address) (SGInfo, error)
}
// FeeFetcher wraps AuditFee method that returns audit fee price from
// the network configuration.
type FeeFetcher interface {
AuditFee() (uint64, error)
}

View File

@ -1,114 +0,0 @@
package basic
import (
"math/big"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"go.uber.org/zap"
)
var (
bigGB = big.NewInt(1 << 30)
bigZero = big.NewInt(0)
bigOne = big.NewInt(1)
)
func (inc *IncomeSettlementContext) Collect() {
inc.mu.Lock()
defer inc.mu.Unlock()
cachedRate, err := inc.rate.BasicRate()
if err != nil {
inc.log.Error(logs.BasicCantGetBasicIncomeRate,
zap.String("error", err.Error()))
return
}
if cachedRate == 0 {
inc.noop = true
return
}
cnrEstimations, err := inc.estimations.Estimations(inc.epoch)
if err != nil {
inc.log.Error(logs.BasicCantFetchContainerSizeEstimations,
zap.Uint64("epoch", inc.epoch),
zap.String("error", err.Error()))
return
}
txTable := common.NewTransferTable()
for i := range cnrEstimations {
owner, err := inc.container.ContainerInfo(cnrEstimations[i].ContainerID)
if err != nil {
inc.log.Warn(logs.BasicCantFetchContainerInfo,
zap.Uint64("epoch", inc.epoch),
zap.Stringer("container_id", cnrEstimations[i].ContainerID),
zap.String("error", err.Error()))
continue
}
cnrNodes, err := inc.placement.ContainerNodes(inc.epoch, cnrEstimations[i].ContainerID)
if err != nil {
inc.log.Debug(logs.BasicCantFetchContainerInfo,
zap.Uint64("epoch", inc.epoch),
zap.Stringer("container_id", cnrEstimations[i].ContainerID),
zap.String("error", err.Error()))
continue
}
avg := inc.avgEstimation(cnrEstimations[i]) // average container size per node
total := calculateBasicSum(avg, cachedRate, len(cnrNodes))
// fill distribute asset table
for i := range cnrNodes {
inc.distributeTable.Put(cnrNodes[i].PublicKey(), avg)
}
txTable.Transfer(&common.TransferTx{
From: owner.Owner(),
To: inc.bankOwner,
Amount: total,
})
}
common.TransferAssets(inc.exchange, txTable, common.BasicIncomeCollectionDetails(inc.epoch))
}
// avgEstimation returns estimation value for a single container. Right now it
// simply calculates an average of all announcements, however it can be smarter and
// base the result on reputation of the announcers and clever math.
func (inc *IncomeSettlementContext) avgEstimation(e *cntClient.Estimations) (avg uint64) {
if len(e.Values) == 0 {
return 0
}
for i := range e.Values {
avg += e.Values[i].Size
}
return avg / uint64(len(e.Values))
}
func calculateBasicSum(size, rate uint64, ln int) *big.Int {
bigRate := big.NewInt(int64(rate))
total := size * uint64(ln)
price := big.NewInt(0).SetUint64(total)
price.Mul(price, bigRate)
price.Div(price, bigGB)
if price.Cmp(bigZero) == 0 {
price.Add(price, bigOne)
}
return price
}

View File

@ -1,80 +0,0 @@
package basic
import (
"math/big"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/util"
)
type (
EstimationFetcher interface {
Estimations(uint64) ([]*container.Estimations, error)
}
RateFetcher interface {
BasicRate() (uint64, error)
}
// BalanceFetcher uses NEP-17 compatible balance contract.
BalanceFetcher interface {
Balance(id user.ID) (*big.Int, error)
}
IncomeSettlementContext struct {
mu sync.Mutex // lock to prevent collection and distribution in the same time
noop bool
log *logger.Logger
epoch uint64
rate RateFetcher
estimations EstimationFetcher
balances BalanceFetcher
container common.ContainerStorage
placement common.PlacementCalculator
exchange common.Exchanger
accounts common.AccountStorage
bankOwner user.ID
// this table is not thread safe, make sure you use it with mu.Lock()
distributeTable *NodeSizeTable
}
IncomeSettlementContextPrms struct {
Log *logger.Logger
Epoch uint64
Rate RateFetcher
Estimations EstimationFetcher
Balances BalanceFetcher
Container common.ContainerStorage
Placement common.PlacementCalculator
Exchange common.Exchanger
Accounts common.AccountStorage
}
)
func NewIncomeSettlementContext(p *IncomeSettlementContextPrms) *IncomeSettlementContext {
res := &IncomeSettlementContext{
log: p.Log,
epoch: p.Epoch,
rate: p.Rate,
estimations: p.Estimations,
balances: p.Balances,
container: p.Container,
placement: p.Placement,
exchange: p.Exchange,
accounts: p.Accounts,
distributeTable: NewNodeSizeTable(),
}
res.bankOwner.SetScriptHash(util.Uint160{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1})
return res
}

View File

@ -1,59 +0,0 @@
package basic
import (
"encoding/hex"
"math/big"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common"
"go.uber.org/zap"
)
func (inc *IncomeSettlementContext) Distribute() {
inc.mu.Lock()
defer inc.mu.Unlock()
if inc.noop {
return
}
txTable := common.NewTransferTable()
bankBalance, err := inc.balances.Balance(inc.bankOwner)
if err != nil {
inc.log.Error(logs.BasicCantFetchBalanceOfBankingAccount,
zap.String("error", err.Error()))
return
}
total := inc.distributeTable.Total()
inc.distributeTable.Iterate(func(key []byte, n *big.Int) {
nodeOwner, err := inc.accounts.ResolveKey(nodeInfoWrapper(key))
if err != nil {
inc.log.Warn(logs.BasicCantTransformPublicKeyToOwnerID,
zap.String("public_key", hex.EncodeToString(key)),
zap.String("error", err.Error()))
return
}
txTable.Transfer(&common.TransferTx{
From: inc.bankOwner,
To: *nodeOwner,
Amount: normalizedValue(n, total, bankBalance),
})
})
common.TransferAssets(inc.exchange, txTable, common.BasicIncomeDistributionDetails(inc.epoch))
}
func normalizedValue(n, total, limit *big.Int) *big.Int {
if limit.Cmp(bigZero) == 0 {
return big.NewInt(0)
}
n.Mul(n, limit)
return n.Div(n, total)
}

View File

@ -1,54 +0,0 @@
package basic
import (
"math/big"
"testing"
"github.com/stretchr/testify/require"
)
type normalizedValueCase struct {
name string
n, total, limit uint64
expected uint64
}
func TestNormalizedValues(t *testing.T) {
testCases := []normalizedValueCase{
{
name: "zero limit",
n: 50,
total: 100,
limit: 0,
expected: 0,
},
{
name: "scale down",
n: 50,
total: 100,
limit: 10,
expected: 5,
},
{
name: "scale up",
n: 50,
total: 100,
limit: 1000,
expected: 500,
},
}
for _, testCase := range testCases {
testNormalizedValues(t, testCase)
}
}
func testNormalizedValues(t *testing.T, c normalizedValueCase) {
n := big.NewInt(0).SetUint64(c.n)
total := big.NewInt(0).SetUint64(c.total)
limit := big.NewInt(0).SetUint64(c.limit)
exp := big.NewInt(0).SetUint64(c.expected)
got := normalizedValue(n, total, limit)
require.Zero(t, exp.Cmp(got), c.name)
}

View File

@ -1,44 +0,0 @@
package basic
import (
"math/big"
)
// NodeSizeTable is not thread safe, make sure it is accessed with external
// locks or in single routine.
type NodeSizeTable struct {
prices map[string]uint64
total uint64
}
func (t *NodeSizeTable) Put(id []byte, avg uint64) {
t.prices[string(id)] += avg
t.total += avg
}
func (t *NodeSizeTable) Total() *big.Int {
return big.NewInt(0).SetUint64(t.total)
}
func (t *NodeSizeTable) Iterate(f func([]byte, *big.Int)) {
for k, v := range t.prices {
n := big.NewInt(0).SetUint64(v)
f([]byte(k), n)
}
}
func NewNodeSizeTable() *NodeSizeTable {
return &NodeSizeTable{
prices: make(map[string]uint64),
}
}
type nodeInfoWrapper []byte
func (nodeInfoWrapper) Price() *big.Int {
panic("should not be used")
}
func (n nodeInfoWrapper) PublicKey() []byte {
return n
}

View File

@ -1,133 +0,0 @@
package settlement
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// HandleAuditEvent catches a new AuditEvent and
// adds AuditProcessor call to the execution queue.
func (p *Processor) HandleAuditEvent(e event.Event) {
ev := e.(AuditEvent)
epoch := ev.Epoch()
if !p.state.IsAlphabet() {
p.log.Info(logs.SettlementNonAlphabetModeIgnoreAuditPayments)
return
}
log := &logger.Logger{Logger: p.log.With(
zap.Uint64("epoch", epoch),
)}
log.Info(logs.SettlementNewAuditSettlementEvent)
if epoch == 0 {
log.Debug(logs.SettlementIgnoreGenesisEpoch)
return
}
handler := &auditEventHandler{
log: log,
epoch: epoch,
proc: p.auditProc,
}
err := p.pool.Submit(handler.handle)
if err != nil {
log.Warn(logs.SettlementCouldNotAddHandlerOfAuditEventToQueue,
zap.String("error", err.Error()),
)
return
}
log.Debug(logs.SettlementAuditEventHandlingSuccessfullyScheduled)
}
func (p *Processor) HandleIncomeCollectionEvent(e event.Event) {
ev := e.(BasicIncomeCollectEvent)
epoch := ev.Epoch()
if !p.state.IsAlphabet() {
p.log.Info(logs.SettlementNonAlphabetModeIgnoreIncomeCollectionEvent)
return
}
p.log.Info(logs.SettlementStartBasicIncomeCollection,
zap.Uint64("epoch", epoch))
p.contextMu.Lock()
defer p.contextMu.Unlock()
if _, ok := p.incomeContexts[epoch]; ok {
p.log.Error(logs.SettlementIncomeContextAlreadyExists,
zap.Uint64("epoch", epoch))
return
}
incomeCtx, err := p.basicIncome.CreateContext(epoch)
if err != nil {
p.log.Error(logs.SettlementCantCreateIncomeContext,
zap.String("error", err.Error()))
return
}
p.incomeContexts[epoch] = incomeCtx
err = p.pool.Submit(func() {
incomeCtx.Collect()
})
if err != nil {
p.log.Warn(logs.SettlementCouldNotAddHandlerOfBasicIncomeCollectionToQueue,
zap.String("error", err.Error()),
)
return
}
}
func (p *Processor) HandleIncomeDistributionEvent(e event.Event) {
ev := e.(BasicIncomeDistributeEvent)
epoch := ev.Epoch()
if !p.state.IsAlphabet() {
p.log.Info(logs.SettlementNonAlphabetModeIgnoreIncomeDistributionEvent)
return
}
p.log.Info(logs.SettlementStartBasicIncomeDistribution,
zap.Uint64("epoch", epoch))
p.contextMu.Lock()
defer p.contextMu.Unlock()
incomeCtx, ok := p.incomeContexts[epoch]
delete(p.incomeContexts, epoch)
if !ok {
p.log.Warn(logs.SettlementIncomeContextDistributionDoesNotExists,
zap.Uint64("epoch", epoch))
return
}
err := p.pool.Submit(func() {
incomeCtx.Distribute()
})
if err != nil {
p.log.Warn(logs.SettlementCouldNotAddHandlerOfBasicIncomeDistributionToQueue,
zap.String("error", err.Error()),
)
return
}
}

View File

@ -1,33 +0,0 @@
package common
import (
"encoding/binary"
)
var (
auditPrefix = []byte{0x40}
basicIncomeCollectionPrefix = []byte{0x41}
basicIncomeDistributionPrefix = []byte{0x42}
)
func AuditSettlementDetails(epoch uint64) []byte {
return details(auditPrefix, epoch)
}
func BasicIncomeCollectionDetails(epoch uint64) []byte {
return details(basicIncomeCollectionPrefix, epoch)
}
func BasicIncomeDistributionDetails(epoch uint64) []byte {
return details(basicIncomeDistributionPrefix, epoch)
}
func details(prefix []byte, epoch uint64) []byte {
prefixLen := len(prefix)
buf := make([]byte, prefixLen+8)
copy(buf, prefix)
binary.LittleEndian.PutUint64(buf[prefixLen:], epoch)
return buf
}

View File

@ -1,28 +0,0 @@
package common
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAuditSettlementDetails(t *testing.T) {
var n uint64 = 1994 // 0x7CA
exp := []byte{0x40, 0xCA, 0x07, 0, 0, 0, 0, 0, 0}
got := AuditSettlementDetails(n)
require.Equal(t, exp, got)
}
func TestBasicIncomeCollectionDetails(t *testing.T) {
var n uint64 = 1994 // 0x7CA
exp := []byte{0x41, 0xCA, 0x07, 0, 0, 0, 0, 0, 0}
got := BasicIncomeCollectionDetails(n)
require.Equal(t, exp, got)
}
func TestBasicIncomeDistributionDetails(t *testing.T) {
var n uint64 = 1994 // 0x7CA
exp := []byte{0x42, 0xCA, 0x07, 0, 0, 0, 0, 0, 0}
got := BasicIncomeDistributionDetails(n)
require.Equal(t, exp, got)
}

View File

@ -1,54 +0,0 @@
package common
import (
"math/big"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
// NodeInfo groups the data about the storage node
// necessary for calculating audit fees.
type NodeInfo interface {
// Must return storage price of the node for one epoch in GASe-12.
Price() *big.Int
// Must return public key of the node.
PublicKey() []byte
}
// ContainerInfo groups the data about FrostFS container
// necessary for calculating audit fee.
type ContainerInfo interface {
// Must return identifier of the container owner.
Owner() user.ID
}
// ContainerStorage is an interface of
// storage of the FrostFS containers.
type ContainerStorage interface {
// Must return information about the container by ID.
ContainerInfo(cid.ID) (ContainerInfo, error)
}
// PlacementCalculator is a component interface
// that builds placement vectors.
type PlacementCalculator interface {
// Must return information about the nodes from container by its ID of the given epoch.
ContainerNodes(uint64, cid.ID) ([]NodeInfo, error)
}
// AccountStorage is an network member accounts interface.
type AccountStorage interface {
// Must resolve information about the storage node
// to its ID in system.
ResolveKey(NodeInfo) (*user.ID, error)
}
// Exchanger is an interface of monetary component.
type Exchanger interface {
// Must transfer amount of GASe-12 from sender to recipient.
//
// Amount must be positive.
Transfer(sender, recipient user.ID, amount *big.Int, details []byte)
}

View File

@ -1,74 +0,0 @@
package common
import (
"math/big"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
type TransferTable struct {
txs map[string]map[string]*TransferTx
}
type TransferTx struct {
From, To user.ID
Amount *big.Int
}
func NewTransferTable() *TransferTable {
return &TransferTable{
txs: make(map[string]map[string]*TransferTx),
}
}
func (t *TransferTable) Transfer(tx *TransferTx) {
if tx.From.Equals(tx.To) {
return
}
from, to := tx.From.EncodeToString(), tx.To.EncodeToString()
m, ok := t.txs[from]
if !ok {
if m, ok = t.txs[to]; ok {
to = from // ignore `From = To` swap because `From` doesn't require
tx.Amount.Neg(tx.Amount)
} else {
m = make(map[string]*TransferTx, 1)
t.txs[from] = m
}
}
tgt, ok := m[to]
if !ok {
m[to] = tx
return
}
tgt.Amount.Add(tgt.Amount, tx.Amount)
}
func (t *TransferTable) Iterate(f func(*TransferTx)) {
for _, m := range t.txs {
for _, tx := range m {
f(tx)
}
}
}
func TransferAssets(e Exchanger, t *TransferTable, details []byte) {
t.Iterate(func(tx *TransferTx) {
sign := tx.Amount.Sign()
if sign == 0 {
return
}
if sign < 0 {
tx.From, tx.To = tx.To, tx.From
tx.Amount.Neg(tx.Amount)
}
e.Transfer(tx.From, tx.To, tx.Amount, details)
})
}

View File

@ -1,17 +0,0 @@
package settlement
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/basic"
)
// AuditProcessor is an interface of data audit fee processor.
type AuditProcessor interface {
// Must process data audit conducted in epoch.
ProcessAuditSettlements(epoch uint64)
}
// BasicIncomeInitializer is an interface of basic income context creator.
type BasicIncomeInitializer interface {
// Creates context that processes basic income for provided epoch.
CreateContext(epoch uint64) (*basic.IncomeSettlementContext, error)
}

View File

@ -1,46 +0,0 @@
package settlement
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
)
// AuditEvent is an event of the start of
// cash settlements for data audit.
type AuditEvent struct {
epoch uint64
}
type (
BasicIncomeCollectEvent = AuditEvent
BasicIncomeDistributeEvent = AuditEvent
)
// MorphEvent implements Neo:Morph event.
func (e AuditEvent) MorphEvent() {}
// NewAuditEvent creates new AuditEvent for epoch.
func NewAuditEvent(epoch uint64) event.Event {
return AuditEvent{
epoch: epoch,
}
}
// Epoch returns the number of the epoch
// in which the event was generated.
func (e AuditEvent) Epoch() uint64 {
return e.epoch
}
// NewBasicIncomeCollectEvent for epoch.
func NewBasicIncomeCollectEvent(epoch uint64) event.Event {
return BasicIncomeCollectEvent{
epoch: epoch,
}
}
// NewBasicIncomeDistributeEvent for epoch.
func NewBasicIncomeDistributeEvent(epoch uint64) event.Event {
return BasicIncomeDistributeEvent{
epoch: epoch,
}
}

View File

@ -1,22 +0,0 @@
package settlement
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
type auditEventHandler struct {
log *logger.Logger
epoch uint64
proc AuditProcessor
}
func (p *auditEventHandler) handle() {
p.log.Info(logs.SettlementProcessAuditSettlements)
p.proc.ProcessAuditSettlements(p.epoch)
p.log.Info(logs.SettlementAuditProcessingFinished)
}

View File

@ -1,31 +0,0 @@
package settlement
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
// Option is a Processor constructor's option.
type Option func(*options)
type options struct {
poolSize int
log *logger.Logger
}
func defaultOptions() *options {
const poolSize = 10
return &options{
poolSize: poolSize,
log: &logger.Logger{Logger: zap.L()},
}
}
// WithLogger returns option to override the component for logging.
func WithLogger(l *logger.Logger) Option {
return func(o *options) {
o.log = l
}
}

View File

@ -1,79 +0,0 @@
package settlement
import (
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/basic"
nodeutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type (
// AlphabetState is a callback interface for inner ring global state.
AlphabetState interface {
IsAlphabet() bool
}
// Processor is an event handler for payments in the system.
Processor struct {
log *logger.Logger
state AlphabetState
pool nodeutil.WorkerPool
auditProc AuditProcessor
basicIncome BasicIncomeInitializer
contextMu sync.Mutex
incomeContexts map[uint64]*basic.IncomeSettlementContext
}
// Prm groups the required parameters of Processor's constructor.
Prm struct {
AuditProcessor AuditProcessor
BasicIncome BasicIncomeInitializer
State AlphabetState
}
)
func panicOnPrmValue(n string, v any) {
panic(fmt.Sprintf("invalid parameter %s (%T):%v", n, v, v))
}
// New creates and returns a new Processor instance.
func New(prm Prm, opts ...Option) *Processor {
switch {
case prm.AuditProcessor == nil:
panicOnPrmValue("AuditProcessor", prm.AuditProcessor)
}
o := defaultOptions()
for i := range opts {
opts[i](o)
}
pool, err := ants.NewPool(o.poolSize, ants.WithNonblocking(true))
if err != nil {
panic(fmt.Errorf("could not create worker pool: %w", err))
}
o.log.Debug(logs.SettlementWorkerPoolForSettlementProcessorSuccessfullyInitialized,
zap.Int("capacity", o.poolSize),
)
return &Processor{
log: o.log,
state: prm.State,
pool: pool,
auditProc: prm.AuditProcessor,
basicIncome: prm.BasicIncome,
incomeContexts: make(map[uint64]*basic.IncomeSettlementContext),
}
}

View File

@ -1,236 +0,0 @@
package innerring
import (
"context"
"crypto/ecdsa"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
storagegroup2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/storagegroup"
frostfsapiclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/auditor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/storagegroup"
"go.uber.org/zap"
)
type (
ClientCache struct {
log *logger.Logger
cache interface {
Get(clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
CloseAll()
}
key *ecdsa.PrivateKey
sgTimeout, headTimeout, rangeTimeout time.Duration
}
clientCacheParams struct {
Log *logger.Logger
Key *ecdsa.PrivateKey
AllowExternal bool
SGTimeout, HeadTimeout, RangeTimeout time.Duration
}
)
func newClientCache(p *clientCacheParams) *ClientCache {
return &ClientCache{
log: p.Log,
cache: cache.NewSDKClientCache(cache.ClientCacheOpts{AllowExternal: p.AllowExternal, Key: p.Key}),
key: p.Key,
sgTimeout: p.SGTimeout,
headTimeout: p.HeadTimeout,
rangeTimeout: p.RangeTimeout,
}
}
func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) {
// Because cache is used by `ClientCache` exclusively,
// client will always have valid key.
return c.cache.Get(info)
}
// GetSG polls the container to get the object by id.
// Returns storage groups structure from received object.
//
// Returns an error of type apistatus.ObjectNotFound if storage group is missing.
func (c *ClientCache) GetSG(ctx context.Context, prm storagegroup2.GetSGPrm) (*storagegroup.StorageGroup, error) {
var sgAddress oid.Address
sgAddress.SetContainer(prm.CID)
sgAddress.SetObject(prm.OID)
return c.getSG(ctx, sgAddress, &prm.NetMap, prm.Container)
}
func (c *ClientCache) getSG(ctx context.Context, addr oid.Address, nm *netmap.NetMap, cn [][]netmap.NodeInfo) (*storagegroup.StorageGroup, error) {
obj := addr.Object()
nodes, err := placement.BuildObjectPlacement(nm, cn, &obj)
if err != nil {
return nil, fmt.Errorf("can't build object placement: %w", err)
}
var info clientcore.NodeInfo
var getObjPrm frostfsapiclient.GetObjectPrm
getObjPrm.SetAddress(addr)
for _, node := range placement.FlattenNodes(nodes) {
err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(node))
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
cli, err := c.getWrappedClient(info)
if err != nil {
c.log.Warn(logs.InnerringCantSetupRemoteConnection,
zap.String("error", err.Error()))
continue
}
ctx, cancel := context.WithTimeout(ctx, c.sgTimeout)
// NOTE: we use the function which does not verify object integrity (checksums, signature),
// but it would be useful to do as part of a data audit.
res, err := cli.GetObject(ctx, getObjPrm)
cancel()
if err != nil {
c.log.Warn(logs.InnerringCantGetStorageGroupObject,
zap.String("error", err.Error()))
continue
}
var sg storagegroup.StorageGroup
err = storagegroup.ReadFromObject(&sg, *res.Object())
if err != nil {
return nil, fmt.Errorf("can't parse storage group from a object: %w", err)
}
return &sg, nil
}
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
}
// GetHeader requests node from the container under audit to return object header by id.
func (c *ClientCache) GetHeader(ctx context.Context, prm auditor.GetHeaderPrm) (*object.Object, error) {
var objAddress oid.Address
objAddress.SetContainer(prm.CID)
objAddress.SetObject(prm.OID)
var info clientcore.NodeInfo
err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(prm.Node))
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
cli, err := c.getWrappedClient(info)
if err != nil {
return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err)
}
cctx, cancel := context.WithTimeout(ctx, c.headTimeout)
var obj *object.Object
if prm.NodeIsRelay {
obj, err = frostfsapiclient.GetObjectHeaderFromContainer(cctx, cli, objAddress)
} else {
obj, err = frostfsapiclient.GetRawObjectHeaderLocally(cctx, cli, objAddress)
}
cancel()
if err != nil {
return nil, fmt.Errorf("object head error: %w", err)
}
return obj, nil
}
// GetRangeHash requests node from the container under audit to return Tillich-Zemor hash of the
// payload range of the object with specified identifier.
func (c *ClientCache) GetRangeHash(ctx context.Context, prm auditor.GetRangeHashPrm) ([]byte, error) {
var objAddress oid.Address
objAddress.SetContainer(prm.CID)
objAddress.SetObject(prm.OID)
var info clientcore.NodeInfo
err := clientcore.NodeInfoFromRawNetmapElement(&info, netmapcore.Node(prm.Node))
if err != nil {
return nil, fmt.Errorf("parse client node info: %w", err)
}
cli, err := c.getWrappedClient(info)
if err != nil {
return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err)
}
cctx, cancel := context.WithTimeout(ctx, c.rangeTimeout)
h, err := frostfsapiclient.HashObjectRange(cctx, cli, objAddress, prm.Range)
cancel()
if err != nil {
return nil, fmt.Errorf("object rangehash error: %w", err)
}
return h, nil
}
func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (frostfsapiclient.Client, error) {
// can be also cached
var cInternal frostfsapiclient.Client
cli, err := c.Get(info)
if err != nil {
return cInternal, fmt.Errorf("could not get API client from cache")
}
cInternal.WrapBasicClient(cli)
cInternal.SetPrivateKey(c.key)
return cInternal, nil
}
func (c ClientCache) ListSG(ctx context.Context, dst *storagegroup2.SearchSGDst, prm storagegroup2.SearchSGPrm) error {
cli, err := c.getWrappedClient(prm.NodeInfo)
if err != nil {
return fmt.Errorf("could not get API client from cache")
}
var cliPrm frostfsapiclient.SearchSGPrm
cliPrm.SetContainerID(prm.Container)
res, err := cli.SearchSG(ctx, cliPrm)
if err != nil {
return err
}
dst.Objects = res.IDList()
return nil
}

View File

@ -1,301 +0,0 @@
package innerring
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/sha256"
"encoding/hex"
"fmt"
"math/big"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/basic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/common"
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
netmapClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
auditAPI "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/audit"
containerAPI "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapAPI "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/storagegroup"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
)
const (
auditSettlementContext = "audit"
basicIncomeSettlementContext = "basic income"
)
type settlementDeps struct {
log *logger.Logger
cnrSrc container.Source
auditClient *auditClient.Client
nmClient *netmapClient.Client
clientCache *ClientCache
balanceClient *balanceClient.Client
settlementCtx string
}
type auditSettlementDeps struct {
settlementDeps
}
type basicIncomeSettlementDeps struct {
settlementDeps
cnrClient *containerClient.Client
}
type basicSettlementConstructor struct {
dep *basicIncomeSettlementDeps
}
type auditSettlementCalculator audit.Calculator
type containerWrapper containerAPI.Container
type nodeInfoWrapper struct {
ni netmapAPI.NodeInfo
}
type sgWrapper storagegroup.StorageGroup
func (s *sgWrapper) Size() uint64 {
return (*storagegroup.StorageGroup)(s).ValidationDataSize()
}
func (n nodeInfoWrapper) PublicKey() []byte {
return n.ni.PublicKey()
}
func (n nodeInfoWrapper) Price() *big.Int {
return big.NewInt(int64(n.ni.Price()))
}
func (c containerWrapper) Owner() user.ID {
return (containerAPI.Container)(c).Owner()
}
func (s settlementDeps) AuditResultsForEpoch(epoch uint64) ([]*auditAPI.Result, error) {
idList, err := s.auditClient.ListAuditResultIDByEpoch(epoch)
if err != nil {
return nil, fmt.Errorf("could not list audit results in sidechain: %w", err)
}
res := make([]*auditAPI.Result, 0, len(idList))
for i := range idList {
r, err := s.auditClient.GetAuditResult(idList[i])
if err != nil {
return nil, fmt.Errorf("could not get audit result: %w", err)
}
res = append(res, r)
}
return res, nil
}
func (s settlementDeps) ContainerInfo(cid cid.ID) (common.ContainerInfo, error) {
cnr, err := s.cnrSrc.Get(cid)
if err != nil {
return nil, fmt.Errorf("could not get container from storage: %w", err)
}
return (containerWrapper)(cnr.Value), nil
}
func (s settlementDeps) buildContainer(e uint64, cid cid.ID) ([][]netmapAPI.NodeInfo, *netmapAPI.NetMap, error) {
var (
nm *netmapAPI.NetMap
err error
)
if e > 0 {
nm, err = s.nmClient.GetNetMapByEpoch(e)
} else {
nm, err = netmap.GetLatestNetworkMap(s.nmClient)
}
if err != nil {
return nil, nil, fmt.Errorf("could not get network map from storage: %w", err)
}
cnr, err := s.cnrSrc.Get(cid)
if err != nil {
return nil, nil, fmt.Errorf("could not get container from sidechain: %w", err)
}
binCnr := make([]byte, sha256.Size)
cid.Encode(binCnr)
cn, err := nm.ContainerNodes(
cnr.Value.PlacementPolicy(),
binCnr, // may be replace pivot calculation to frostfs-api-go
)
if err != nil {
return nil, nil, fmt.Errorf("could not calculate container nodes: %w", err)
}
return cn, nm, nil
}
func (s settlementDeps) ContainerNodes(e uint64, cid cid.ID) ([]common.NodeInfo, error) {
cn, _, err := s.buildContainer(e, cid)
if err != nil {
return nil, err
}
var sz int
for i := range cn {
sz += len(cn[i])
}
res := make([]common.NodeInfo, 0, sz)
for i := range cn {
for j := range cn[i] {
res = append(res, nodeInfoWrapper{
ni: cn[i][j],
})
}
}
return res, nil
}
// SGInfo returns audit.SGInfo by object address.
//
// Returns an error of type apistatus.ObjectNotFound if storage group is missing.
func (s settlementDeps) SGInfo(addr oid.Address) (audit.SGInfo, error) {
cnr := addr.Container()
cn, nm, err := s.buildContainer(0, cnr)
if err != nil {
return nil, err
}
sg, err := s.clientCache.getSG(context.Background(), addr, nm, cn)
if err != nil {
return nil, err
}
return (*sgWrapper)(sg), nil
}
func (s settlementDeps) ResolveKey(ni common.NodeInfo) (*user.ID, error) {
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
if err != nil {
return nil, err
}
var id user.ID
user.IDFromKey(&id, (ecdsa.PublicKey)(*pub))
return &id, nil
}
func (s settlementDeps) Transfer(sender, recipient user.ID, amount *big.Int, details []byte) {
if s.settlementCtx == "" {
panic("unknown settlement deps context")
}
log := s.log.With(
zap.Stringer("sender", sender),
zap.Stringer("recipient", recipient),
zap.Stringer("amount (GASe-12)", amount),
zap.String("details", hex.EncodeToString(details)),
)
if !amount.IsInt64() {
s.log.Error(logs.InnerringAmountCanNotBeRepresentedAsAnInt64)
return
}
params := balanceClient.TransferPrm{
Amount: amount.Int64(),
From: sender,
To: recipient,
Details: details,
}
err := s.balanceClient.TransferX(params)
if err != nil {
log.Error(fmt.Sprintf("%s: could not send transfer", s.settlementCtx),
zap.String("error", err.Error()),
)
return
}
log.Debug(fmt.Sprintf("%s: transfer was successfully sent", s.settlementCtx))
}
func (b basicIncomeSettlementDeps) BasicRate() (uint64, error) {
return b.nmClient.BasicIncomeRate()
}
func (b basicIncomeSettlementDeps) Estimations(epoch uint64) ([]*containerClient.Estimations, error) {
estimationIDs, err := b.cnrClient.ListLoadEstimationsByEpoch(epoch)
if err != nil {
return nil, err
}
result := make([]*containerClient.Estimations, 0, len(estimationIDs))
for i := range estimationIDs {
estimation, err := b.cnrClient.GetUsedSpaceEstimations(estimationIDs[i])
if err != nil {
b.log.Warn(logs.InnerringCantGetUsedSpaceEstimation,
zap.String("estimation_id", hex.EncodeToString(estimationIDs[i])),
zap.String("error", err.Error()))
continue
}
result = append(result, estimation)
}
return result, nil
}
func (b basicIncomeSettlementDeps) Balance(id user.ID) (*big.Int, error) {
return b.balanceClient.BalanceOf(id)
}
func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) {
(*audit.Calculator)(s).Calculate(&audit.CalculatePrm{
Epoch: epoch,
})
}
func (b *basicSettlementConstructor) CreateContext(epoch uint64) (*basic.IncomeSettlementContext, error) {
return basic.NewIncomeSettlementContext(&basic.IncomeSettlementContextPrms{
Log: b.dep.log,
Epoch: epoch,
Rate: b.dep,
Estimations: b.dep,
Balances: b.dep,
Container: b.dep,
Placement: b.dep,
Exchange: b.dep,
Accounts: b.dep,
}), nil
}

View File

@ -6,8 +6,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -146,18 +144,6 @@ func (s *Server) VoteForSidechainValidator(prm governance.VoteValidatorPrm) erro
return s.voteForSidechainValidator(prm)
}
// WriteReport composes the audit result structure from the audit report
// and sends it to Audit contract.
func (s *Server) WriteReport(r *audit.Report) error {
res := r.Result()
res.SetAuditorKey(s.pubKey)
prm := auditClient.PutPrm{}
prm.SetResult(res)
return s.auditClient.PutAuditResult(prm)
}
// ResetEpochTimer resets the block timer that produces events to update epoch
// counter in the netmap contract. It is used to synchronize this even production
// based on the block with a notification of the last epoch.