diff --git a/pkg/innerring/processors/settlement/audit/calculate.go b/pkg/innerring/processors/settlement/audit/calculate.go new file mode 100644 index 000000000..6bea99850 --- /dev/null +++ b/pkg/innerring/processors/settlement/audit/calculate.go @@ -0,0 +1,269 @@ +package audit + +import ( + "bytes" + "encoding/hex" + "math/big" + + "github.com/nspcc-dev/neofs-api-go/pkg/audit" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// CalculatePrm groups required parameters of +// Calculator.CalculateForEpoch call. +type CalculatePrm struct { + // Number of epoch to perform the calculation. + Epoch uint64 +} + +type singleResultCtx struct { + eAudit uint64 + + auditResult *audit.Result + + log *logger.Logger + + cid *container.ID + + txTable *transferTable + + cnrInfo ContainerInfo + + cnrNodes []NodeInfo + + passNodes map[string]NodeInfo + + sumSGSize *big.Int +} + +var bigGB = big.NewInt(1 << 30) + +// Calculate calculates payments for audit results in a specific epoch of the network. +// Wraps the results in a money transfer transaction and sends it to the network. +func (c *Calculator) Calculate(p *CalculatePrm) { + log := c.opts.log.With( + zap.Uint64("current epoch", p.Epoch), + ) + + log.Info("calculate audit settlements") + + log.Debug("getting results for the previous epoch") + + auditResults, err := c.prm.ResultStorage.AuditResultsForEpoch(p.Epoch - 1) + if err != nil { + log.Error("could not collect audit results") + return + } else if len(auditResults) == 0 { + log.Debug("no audit results in previous epoch") + return + } + + log.Debug("processing audit results", + zap.Int("number", len(auditResults)), + ) + + table := newTransferTable() + + for i := range auditResults { + c.processResult(&singleResultCtx{ + log: log, + auditResult: auditResults[i], + txTable: table, + }) + } + + log.Debug("processing transfers") + + table.iterate(func(tx *transferTx) { + c.prm.Exchanger.Transfer(tx.from, tx.to, tx.amount) + }) +} + +func (c *Calculator) processResult(ctx *singleResultCtx) { + ctx.log = ctx.log.With( + zap.Stringer("cid", ctx.containerID()), + zap.Uint64("audit epoch", ctx.auditResult.AuditEpoch()), + ) + + ctx.log.Debug("reading information about the container") + + ok := c.readContainerInfo(ctx) + if !ok { + return + } + + ctx.log.Debug("building placement") + + ok = c.buildPlacement(ctx) + if !ok { + return + } + + ctx.log.Debug("collecting passed nodes") + + ok = c.collectPassNodes(ctx) + if !ok { + return + } + + ctx.log.Debug("calculating sum of the sizes of all storage groups") + + ok = c.sumSGSizes(ctx) + if !ok { + return + } + + ctx.log.Debug("filling transfer table") + + c.fillTransferTable(ctx) +} + +func (c *Calculator) readContainerInfo(ctx *singleResultCtx) bool { + var err error + + ctx.cnrInfo, err = c.prm.ContainerStorage.ContainerInfo(ctx.auditResult.ContainerID()) + if err != nil { + ctx.log.Error("could not get container info", + zap.String("error", err.Error()), + ) + } + + return err == nil +} + +func (c *Calculator) buildPlacement(ctx *singleResultCtx) bool { + var err error + + ctx.cnrNodes, err = c.prm.PlacementCalculator.ContainerNodes(ctx.auditEpoch(), ctx.containerID()) + if err != nil { + ctx.log.Error("could not get container nodes", + zap.String("error", err.Error()), + ) + } + + empty := len(ctx.cnrNodes) == 0 + if empty { + ctx.log.Debug("empty list of container nodes") + } + + return err == nil && !empty +} + +func (c *Calculator) collectPassNodes(ctx *singleResultCtx) bool { + ctx.passNodes = make(map[string]NodeInfo) + +loop: + for _, cnrNode := range ctx.cnrNodes { + for _, passNode := range ctx.auditResult.PassNodes() { + if !bytes.Equal(cnrNode.PublicKey(), passNode) { + continue + } + + for _, failNode := range ctx.auditResult.FailNodes() { + if bytes.Equal(cnrNode.PublicKey(), failNode) { + continue loop + } + } + + ctx.passNodes[hex.EncodeToString(passNode)] = cnrNode + } + } + + empty := len(ctx.passNodes) == 0 + if empty { + ctx.log.Debug("none of the container nodes passed the audit") + } + + return !empty +} + +func (c *Calculator) sumSGSizes(ctx *singleResultCtx) bool { + passedSG := ctx.auditResult.PassSG() + + if len(passedSG) == 0 { + ctx.log.Debug("empty list of passed SG") + return false + } + + sumPassSGSize := uint64(0) + + addr := object.NewAddress() + addr.SetContainerID(ctx.containerID()) + + for _, sgID := range ctx.auditResult.PassSG() { + addr.SetObjectID(sgID) + + sgInfo, err := c.prm.SGStorage.SGInfo(addr) + if err != nil { + ctx.log.Error("could not get SG info", + zap.Stringer("id", sgID), + ) + + return false // we also can continue and calculate at least some part + } + + sumPassSGSize += sgInfo.Size() + } + + if sumPassSGSize == 0 { + ctx.log.Debug("zero sum SG size") + return false + } + + ctx.sumSGSize = big.NewInt(int64(sumPassSGSize)) + + return true +} + +func (c *Calculator) fillTransferTable(ctx *singleResultCtx) bool { + cnrOwner := ctx.cnrInfo.Owner() + + for k, info := range ctx.passNodes { + ownerID, err := c.prm.AccountStorage.ResolveKey(info) + if err != nil { + ctx.log.Error("could not resolve public key of the storage node", + zap.String("error", err.Error()), + zap.String("key", k), + ) + + return false // we also can continue and calculate at least some part + } + + price := info.Price() + + ctx.log.Debug("calculating storage node salary for audit (GASe-12)", + zap.Stringer("sum SG size", ctx.sumSGSize), + zap.Stringer("price", price), + ) + + fee := big.NewInt(0).Mul(price, ctx.sumSGSize) + fee.Div(fee, bigGB) + + ctx.txTable.transfer(&transferTx{ + from: cnrOwner, + to: ownerID, + amount: fee, + }) + } + + return false +} + +func (c *singleResultCtx) containerID() *container.ID { + if c.cid == nil { + c.cid = c.auditResult.ContainerID() + } + + return c.cid +} + +func (c *singleResultCtx) auditEpoch() uint64 { + if c.eAudit == 0 { + c.eAudit = c.auditResult.AuditEpoch() + } + + return c.eAudit +} diff --git a/pkg/innerring/processors/settlement/audit/calculator.go b/pkg/innerring/processors/settlement/audit/calculator.go new file mode 100644 index 000000000..69932ec6b --- /dev/null +++ b/pkg/innerring/processors/settlement/audit/calculator.go @@ -0,0 +1,48 @@ +package audit + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Calculator represents component for calculating payments +// based on data audit results and sending remittances to the chain. +type Calculator struct { + prm *CalculatorPrm + + opts *options +} + +// CalculatorOption is a Calculator constructor's option. +type CalculatorOption func(*options) + +type options struct { + log *logger.Logger +} + +func defaultOptions() *options { + return &options{ + log: zap.L(), + } +} + +// NewCalculator creates, initializes and returns new Calculator instance. +func NewCalculator(p *CalculatorPrm, opts ...CalculatorOption) *Calculator { + o := defaultOptions() + + for i := range opts { + opts[i](o) + } + + return &Calculator{ + prm: p, + opts: o, + } +} + +// WithLogger returns option to specify logging component. +func WithLogger(l *logger.Logger) CalculatorOption { + return func(o *options) { + o.log = l + } +} diff --git a/pkg/innerring/processors/settlement/audit/prm.go b/pkg/innerring/processors/settlement/audit/prm.go new file mode 100644 index 000000000..379396d90 --- /dev/null +++ b/pkg/innerring/processors/settlement/audit/prm.go @@ -0,0 +1,90 @@ +package audit + +import ( + "math/big" + + "github.com/nspcc-dev/neofs-api-go/pkg/audit" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" +) + +// CalculatorPrm groups the parameters of Calculator's constructor. +type CalculatorPrm struct { + ResultStorage ResultStorage + + ContainerStorage ContainerStorage + + PlacementCalculator PlacementCalculator + + SGStorage SGStorage + + AccountStorage AccountStorage + + Exchanger Exchanger +} + +// ResultStorage is an interface of storage of the audit results. +type ResultStorage interface { + // Must return all audit results by epoch number. + AuditResultsForEpoch(epoch uint64) ([]*audit.Result, error) +} + +// NodeInfo groups the data about the storage node +// necessary for calculating audit fees. +type NodeInfo interface { + // Must return storage price of the node for one epoch in GASe-12. + Price() *big.Int + + // Must return public key of the node. + PublicKey() []byte +} + +// ContainerInfo groups the data about NeoFS container +// necessary for calculating audit fee. +type ContainerInfo interface { + // Must return identifier of the container owner. + Owner() *owner.ID +} + +// ContainerStorage is an interface of +// storage of the NeoFS containers. +type ContainerStorage interface { + // Must return information about the container by ID. + ContainerInfo(*container.ID) (ContainerInfo, error) +} + +// PlacementCalculator is a component interface +// that builds placement vectors. +type PlacementCalculator interface { + // Must return information about the nodes from container cid of the epoch e. + ContainerNodes(e uint64, cid *container.ID) ([]NodeInfo, error) +} + +// SGInfo groups the data about NeoFS storage group +// necessary for calculating audit fee. +type SGInfo interface { + // Must return sum size of the all group members. + Size() uint64 +} + +// SGStorage is an interface of storage of the storage groups. +type SGStorage interface { + // Must return information about the storage group by address. + SGInfo(*object.Address) (SGInfo, error) +} + +// AccountStorage is an network member accounts interface. +type AccountStorage interface { + // Must resolve information about the storage node + // to its ID in system. + ResolveKey(NodeInfo) (*owner.ID, error) +} + +// Exchanger is an interface of monetary component. +type Exchanger interface { + // Must transfer amount of GASe-12 from sender to recipient. + // + // Amount must be positive. + Transfer(sender, recipient *owner.ID, amount *big.Int) +} diff --git a/pkg/innerring/processors/settlement/audit/util.go b/pkg/innerring/processors/settlement/audit/util.go new file mode 100644 index 000000000..c0208223b --- /dev/null +++ b/pkg/innerring/processors/settlement/audit/util.go @@ -0,0 +1,57 @@ +package audit + +import ( + "math/big" + + "github.com/nspcc-dev/neofs-api-go/pkg/owner" +) + +type transferTable struct { + txs map[string]map[string]*transferTx +} + +type transferTx struct { + from, to *owner.ID + + amount *big.Int +} + +func newTransferTable() *transferTable { + return &transferTable{ + txs: make(map[string]map[string]*transferTx), + } +} + +func (t *transferTable) transfer(tx *transferTx) { + from, to := tx.from.String(), tx.to.String() + if from == to { + return + } + + m, ok := t.txs[from] + if !ok { + if m, ok = t.txs[to]; ok { + from, to = to, from + tx.amount.Neg(tx.amount) + } else { + m = make(map[string]*transferTx, 1) + t.txs[from] = m + } + } + + tgt, ok := m[to] + if !ok { + m[to] = tx + return + } + + tgt.amount.Add(tgt.amount, tx.amount) +} + +func (t *transferTable) iterate(f func(*transferTx)) { + for _, m := range t.txs { + for _, tx := range m { + f(tx) + } + } +}