[#326] ir/settlement: Implement calculator of settlements for audit
Implement component that analyzes audit results and generates transactions for payment of awards for successfully passed audit. When calculating the total fee, the declared price of the node (attribute) and the total volume of storage groups, which were successfully audited by the container, are taken into account. In one call the calculator processes all audit results for the previous epoch (relative to the calculated parameter). Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
20ad0ae82a
commit
685b593af3
4 changed files with 464 additions and 0 deletions
269
pkg/innerring/processors/settlement/audit/calculate.go
Normal file
269
pkg/innerring/processors/settlement/audit/calculate.go
Normal file
|
@ -0,0 +1,269 @@
|
|||
package audit
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"math/big"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/audit"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// CalculatePrm groups required parameters of
|
||||
// Calculator.CalculateForEpoch call.
|
||||
type CalculatePrm struct {
|
||||
// Number of epoch to perform the calculation.
|
||||
Epoch uint64
|
||||
}
|
||||
|
||||
type singleResultCtx struct {
|
||||
eAudit uint64
|
||||
|
||||
auditResult *audit.Result
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
cid *container.ID
|
||||
|
||||
txTable *transferTable
|
||||
|
||||
cnrInfo ContainerInfo
|
||||
|
||||
cnrNodes []NodeInfo
|
||||
|
||||
passNodes map[string]NodeInfo
|
||||
|
||||
sumSGSize *big.Int
|
||||
}
|
||||
|
||||
var bigGB = big.NewInt(1 << 30)
|
||||
|
||||
// Calculate calculates payments for audit results in a specific epoch of the network.
|
||||
// Wraps the results in a money transfer transaction and sends it to the network.
|
||||
func (c *Calculator) Calculate(p *CalculatePrm) {
|
||||
log := c.opts.log.With(
|
||||
zap.Uint64("current epoch", p.Epoch),
|
||||
)
|
||||
|
||||
log.Info("calculate audit settlements")
|
||||
|
||||
log.Debug("getting results for the previous epoch")
|
||||
|
||||
auditResults, err := c.prm.ResultStorage.AuditResultsForEpoch(p.Epoch - 1)
|
||||
if err != nil {
|
||||
log.Error("could not collect audit results")
|
||||
return
|
||||
} else if len(auditResults) == 0 {
|
||||
log.Debug("no audit results in previous epoch")
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("processing audit results",
|
||||
zap.Int("number", len(auditResults)),
|
||||
)
|
||||
|
||||
table := newTransferTable()
|
||||
|
||||
for i := range auditResults {
|
||||
c.processResult(&singleResultCtx{
|
||||
log: log,
|
||||
auditResult: auditResults[i],
|
||||
txTable: table,
|
||||
})
|
||||
}
|
||||
|
||||
log.Debug("processing transfers")
|
||||
|
||||
table.iterate(func(tx *transferTx) {
|
||||
c.prm.Exchanger.Transfer(tx.from, tx.to, tx.amount)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Calculator) processResult(ctx *singleResultCtx) {
|
||||
ctx.log = ctx.log.With(
|
||||
zap.Stringer("cid", ctx.containerID()),
|
||||
zap.Uint64("audit epoch", ctx.auditResult.AuditEpoch()),
|
||||
)
|
||||
|
||||
ctx.log.Debug("reading information about the container")
|
||||
|
||||
ok := c.readContainerInfo(ctx)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
ctx.log.Debug("building placement")
|
||||
|
||||
ok = c.buildPlacement(ctx)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
ctx.log.Debug("collecting passed nodes")
|
||||
|
||||
ok = c.collectPassNodes(ctx)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
ctx.log.Debug("calculating sum of the sizes of all storage groups")
|
||||
|
||||
ok = c.sumSGSizes(ctx)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
ctx.log.Debug("filling transfer table")
|
||||
|
||||
c.fillTransferTable(ctx)
|
||||
}
|
||||
|
||||
func (c *Calculator) readContainerInfo(ctx *singleResultCtx) bool {
|
||||
var err error
|
||||
|
||||
ctx.cnrInfo, err = c.prm.ContainerStorage.ContainerInfo(ctx.auditResult.ContainerID())
|
||||
if err != nil {
|
||||
ctx.log.Error("could not get container info",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (c *Calculator) buildPlacement(ctx *singleResultCtx) bool {
|
||||
var err error
|
||||
|
||||
ctx.cnrNodes, err = c.prm.PlacementCalculator.ContainerNodes(ctx.auditEpoch(), ctx.containerID())
|
||||
if err != nil {
|
||||
ctx.log.Error("could not get container nodes",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
empty := len(ctx.cnrNodes) == 0
|
||||
if empty {
|
||||
ctx.log.Debug("empty list of container nodes")
|
||||
}
|
||||
|
||||
return err == nil && !empty
|
||||
}
|
||||
|
||||
func (c *Calculator) collectPassNodes(ctx *singleResultCtx) bool {
|
||||
ctx.passNodes = make(map[string]NodeInfo)
|
||||
|
||||
loop:
|
||||
for _, cnrNode := range ctx.cnrNodes {
|
||||
for _, passNode := range ctx.auditResult.PassNodes() {
|
||||
if !bytes.Equal(cnrNode.PublicKey(), passNode) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, failNode := range ctx.auditResult.FailNodes() {
|
||||
if bytes.Equal(cnrNode.PublicKey(), failNode) {
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
|
||||
ctx.passNodes[hex.EncodeToString(passNode)] = cnrNode
|
||||
}
|
||||
}
|
||||
|
||||
empty := len(ctx.passNodes) == 0
|
||||
if empty {
|
||||
ctx.log.Debug("none of the container nodes passed the audit")
|
||||
}
|
||||
|
||||
return !empty
|
||||
}
|
||||
|
||||
func (c *Calculator) sumSGSizes(ctx *singleResultCtx) bool {
|
||||
passedSG := ctx.auditResult.PassSG()
|
||||
|
||||
if len(passedSG) == 0 {
|
||||
ctx.log.Debug("empty list of passed SG")
|
||||
return false
|
||||
}
|
||||
|
||||
sumPassSGSize := uint64(0)
|
||||
|
||||
addr := object.NewAddress()
|
||||
addr.SetContainerID(ctx.containerID())
|
||||
|
||||
for _, sgID := range ctx.auditResult.PassSG() {
|
||||
addr.SetObjectID(sgID)
|
||||
|
||||
sgInfo, err := c.prm.SGStorage.SGInfo(addr)
|
||||
if err != nil {
|
||||
ctx.log.Error("could not get SG info",
|
||||
zap.Stringer("id", sgID),
|
||||
)
|
||||
|
||||
return false // we also can continue and calculate at least some part
|
||||
}
|
||||
|
||||
sumPassSGSize += sgInfo.Size()
|
||||
}
|
||||
|
||||
if sumPassSGSize == 0 {
|
||||
ctx.log.Debug("zero sum SG size")
|
||||
return false
|
||||
}
|
||||
|
||||
ctx.sumSGSize = big.NewInt(int64(sumPassSGSize))
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Calculator) fillTransferTable(ctx *singleResultCtx) bool {
|
||||
cnrOwner := ctx.cnrInfo.Owner()
|
||||
|
||||
for k, info := range ctx.passNodes {
|
||||
ownerID, err := c.prm.AccountStorage.ResolveKey(info)
|
||||
if err != nil {
|
||||
ctx.log.Error("could not resolve public key of the storage node",
|
||||
zap.String("error", err.Error()),
|
||||
zap.String("key", k),
|
||||
)
|
||||
|
||||
return false // we also can continue and calculate at least some part
|
||||
}
|
||||
|
||||
price := info.Price()
|
||||
|
||||
ctx.log.Debug("calculating storage node salary for audit (GASe-12)",
|
||||
zap.Stringer("sum SG size", ctx.sumSGSize),
|
||||
zap.Stringer("price", price),
|
||||
)
|
||||
|
||||
fee := big.NewInt(0).Mul(price, ctx.sumSGSize)
|
||||
fee.Div(fee, bigGB)
|
||||
|
||||
ctx.txTable.transfer(&transferTx{
|
||||
from: cnrOwner,
|
||||
to: ownerID,
|
||||
amount: fee,
|
||||
})
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *singleResultCtx) containerID() *container.ID {
|
||||
if c.cid == nil {
|
||||
c.cid = c.auditResult.ContainerID()
|
||||
}
|
||||
|
||||
return c.cid
|
||||
}
|
||||
|
||||
func (c *singleResultCtx) auditEpoch() uint64 {
|
||||
if c.eAudit == 0 {
|
||||
c.eAudit = c.auditResult.AuditEpoch()
|
||||
}
|
||||
|
||||
return c.eAudit
|
||||
}
|
48
pkg/innerring/processors/settlement/audit/calculator.go
Normal file
48
pkg/innerring/processors/settlement/audit/calculator.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package audit
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Calculator represents component for calculating payments
|
||||
// based on data audit results and sending remittances to the chain.
|
||||
type Calculator struct {
|
||||
prm *CalculatorPrm
|
||||
|
||||
opts *options
|
||||
}
|
||||
|
||||
// CalculatorOption is a Calculator constructor's option.
|
||||
type CalculatorOption func(*options)
|
||||
|
||||
type options struct {
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultOptions() *options {
|
||||
return &options{
|
||||
log: zap.L(),
|
||||
}
|
||||
}
|
||||
|
||||
// NewCalculator creates, initializes and returns new Calculator instance.
|
||||
func NewCalculator(p *CalculatorPrm, opts ...CalculatorOption) *Calculator {
|
||||
o := defaultOptions()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](o)
|
||||
}
|
||||
|
||||
return &Calculator{
|
||||
prm: p,
|
||||
opts: o,
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to specify logging component.
|
||||
func WithLogger(l *logger.Logger) CalculatorOption {
|
||||
return func(o *options) {
|
||||
o.log = l
|
||||
}
|
||||
}
|
90
pkg/innerring/processors/settlement/audit/prm.go
Normal file
90
pkg/innerring/processors/settlement/audit/prm.go
Normal file
|
@ -0,0 +1,90 @@
|
|||
package audit
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/audit"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||
)
|
||||
|
||||
// CalculatorPrm groups the parameters of Calculator's constructor.
|
||||
type CalculatorPrm struct {
|
||||
ResultStorage ResultStorage
|
||||
|
||||
ContainerStorage ContainerStorage
|
||||
|
||||
PlacementCalculator PlacementCalculator
|
||||
|
||||
SGStorage SGStorage
|
||||
|
||||
AccountStorage AccountStorage
|
||||
|
||||
Exchanger Exchanger
|
||||
}
|
||||
|
||||
// ResultStorage is an interface of storage of the audit results.
|
||||
type ResultStorage interface {
|
||||
// Must return all audit results by epoch number.
|
||||
AuditResultsForEpoch(epoch uint64) ([]*audit.Result, error)
|
||||
}
|
||||
|
||||
// NodeInfo groups the data about the storage node
|
||||
// necessary for calculating audit fees.
|
||||
type NodeInfo interface {
|
||||
// Must return storage price of the node for one epoch in GASe-12.
|
||||
Price() *big.Int
|
||||
|
||||
// Must return public key of the node.
|
||||
PublicKey() []byte
|
||||
}
|
||||
|
||||
// ContainerInfo groups the data about NeoFS container
|
||||
// necessary for calculating audit fee.
|
||||
type ContainerInfo interface {
|
||||
// Must return identifier of the container owner.
|
||||
Owner() *owner.ID
|
||||
}
|
||||
|
||||
// ContainerStorage is an interface of
|
||||
// storage of the NeoFS containers.
|
||||
type ContainerStorage interface {
|
||||
// Must return information about the container by ID.
|
||||
ContainerInfo(*container.ID) (ContainerInfo, error)
|
||||
}
|
||||
|
||||
// PlacementCalculator is a component interface
|
||||
// that builds placement vectors.
|
||||
type PlacementCalculator interface {
|
||||
// Must return information about the nodes from container cid of the epoch e.
|
||||
ContainerNodes(e uint64, cid *container.ID) ([]NodeInfo, error)
|
||||
}
|
||||
|
||||
// SGInfo groups the data about NeoFS storage group
|
||||
// necessary for calculating audit fee.
|
||||
type SGInfo interface {
|
||||
// Must return sum size of the all group members.
|
||||
Size() uint64
|
||||
}
|
||||
|
||||
// SGStorage is an interface of storage of the storage groups.
|
||||
type SGStorage interface {
|
||||
// Must return information about the storage group by address.
|
||||
SGInfo(*object.Address) (SGInfo, error)
|
||||
}
|
||||
|
||||
// AccountStorage is an network member accounts interface.
|
||||
type AccountStorage interface {
|
||||
// Must resolve information about the storage node
|
||||
// to its ID in system.
|
||||
ResolveKey(NodeInfo) (*owner.ID, error)
|
||||
}
|
||||
|
||||
// Exchanger is an interface of monetary component.
|
||||
type Exchanger interface {
|
||||
// Must transfer amount of GASe-12 from sender to recipient.
|
||||
//
|
||||
// Amount must be positive.
|
||||
Transfer(sender, recipient *owner.ID, amount *big.Int)
|
||||
}
|
57
pkg/innerring/processors/settlement/audit/util.go
Normal file
57
pkg/innerring/processors/settlement/audit/util.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package audit
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||
)
|
||||
|
||||
type transferTable struct {
|
||||
txs map[string]map[string]*transferTx
|
||||
}
|
||||
|
||||
type transferTx struct {
|
||||
from, to *owner.ID
|
||||
|
||||
amount *big.Int
|
||||
}
|
||||
|
||||
func newTransferTable() *transferTable {
|
||||
return &transferTable{
|
||||
txs: make(map[string]map[string]*transferTx),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transferTable) transfer(tx *transferTx) {
|
||||
from, to := tx.from.String(), tx.to.String()
|
||||
if from == to {
|
||||
return
|
||||
}
|
||||
|
||||
m, ok := t.txs[from]
|
||||
if !ok {
|
||||
if m, ok = t.txs[to]; ok {
|
||||
from, to = to, from
|
||||
tx.amount.Neg(tx.amount)
|
||||
} else {
|
||||
m = make(map[string]*transferTx, 1)
|
||||
t.txs[from] = m
|
||||
}
|
||||
}
|
||||
|
||||
tgt, ok := m[to]
|
||||
if !ok {
|
||||
m[to] = tx
|
||||
return
|
||||
}
|
||||
|
||||
tgt.amount.Add(tgt.amount, tx.amount)
|
||||
}
|
||||
|
||||
func (t *transferTable) iterate(f func(*transferTx)) {
|
||||
for _, m := range t.txs {
|
||||
for _, tx := range m {
|
||||
f(tx)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue