[#326] ir: Calculate audit settlements on new epoch

Calculate payments to storage nodes for the passed audit when changing the
epoch. The calculation results are wrapped in a call to the Balance contract
(one transaction per user-to-user transfer).

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-01-26 13:33:28 +03:00 committed by Alex Vanin
parent 685b593af3
commit 87c2c3ecc6
4 changed files with 293 additions and 11 deletions

193
pkg/innerring/audit.go Normal file
View file

@ -0,0 +1,193 @@
package innerring
import (
"context"
"math/big"
auditAPI "github.com/nspcc-dev/neofs-api-go/pkg/audit"
containerAPI "github.com/nspcc-dev/neofs-api-go/pkg/container"
netmapAPI "github.com/nspcc-dev/neofs-api-go/pkg/netmap"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-api-go/pkg/storagegroup"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit"
auditClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper"
balanceClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type auditSettlementDeps struct {
log *logger.Logger
cnrSrc container.Source
auditClient *auditClient.ClientWrapper
nmSrc netmap.Source
clientCache *ClientCache
balanceClient *balanceClient.Wrapper
}
type containerWrapper containerAPI.Container
type nodeInfoWrapper struct {
ni *netmapAPI.Node
}
type sgWrapper storagegroup.StorageGroup
func (s *sgWrapper) Size() uint64 {
return (*storagegroup.StorageGroup)(s).ValidationDataSize()
}
func (n nodeInfoWrapper) PublicKey() []byte {
return n.ni.PublicKey()
}
func (n nodeInfoWrapper) Price() *big.Int {
return big.NewInt(int64(n.ni.Price))
}
func (c *containerWrapper) Owner() *owner.ID {
return (*containerAPI.Container)(c).OwnerID()
}
func (a auditSettlementDeps) AuditResultsForEpoch(epoch uint64) ([]*auditAPI.Result, error) {
idList, err := a.auditClient.ListAuditResultIDByEpoch(epoch)
if err != nil {
return nil, errors.Wrap(err, "could not list audit results in sidechain")
}
res := make([]*auditAPI.Result, 0, len(idList))
for i := range idList {
r, err := a.auditClient.GetAuditResult(idList[i])
if err != nil {
return nil, errors.Wrap(err, "could not get audit result")
}
res = append(res, r)
}
return res, nil
}
func (a auditSettlementDeps) ContainerInfo(cid *containerAPI.ID) (audit.ContainerInfo, error) {
cnr, err := a.cnrSrc.Get(cid)
if err != nil {
return nil, errors.Wrap(err, "could not get container from storage")
}
return (*containerWrapper)(cnr), nil
}
func (a auditSettlementDeps) buildContainer(e uint64, cid *containerAPI.ID) (netmapAPI.ContainerNodes, *netmapAPI.Netmap, error) {
var (
nm *netmapAPI.Netmap
err error
)
if e > 0 {
nm, err = a.nmSrc.GetNetMapByEpoch(e)
} else {
nm, err = netmap.GetLatestNetworkMap(a.nmSrc)
}
if err != nil {
return nil, nil, errors.Wrap(err, "could not get network map from storage")
}
cnr, err := a.cnrSrc.Get(cid)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get container from sidechain")
}
cn, err := nm.GetContainerNodes(
cnr.PlacementPolicy(),
cid.ToV2().GetValue(), // may be replace pivot calculation to neofs-api-go
)
if err != nil {
return nil, nil, errors.Wrap(err, "could not calculate container nodes")
}
return cn, nm, nil
}
func (a auditSettlementDeps) ContainerNodes(e uint64, cid *containerAPI.ID) ([]audit.NodeInfo, error) {
cn, _, err := a.buildContainer(e, cid)
if err != nil {
return nil, err
}
ns := cn.Flatten()
res := make([]audit.NodeInfo, 0, len(ns))
for i := range ns {
res = append(res, &nodeInfoWrapper{
ni: ns[i],
})
}
return res, nil
}
func (a auditSettlementDeps) SGInfo(addr *object.Address) (audit.SGInfo, error) {
cn, nm, err := a.buildContainer(0, addr.ContainerID())
if err != nil {
return nil, err
}
sg, err := a.clientCache.getSG(context.Background(), addr, nm, cn)
if err != nil {
return nil, err
}
return (*sgWrapper)(sg), nil
}
func (a auditSettlementDeps) ResolveKey(ni audit.NodeInfo) (*owner.ID, error) {
w, err := owner.NEO3WalletFromPublicKey(crypto.UnmarshalPublicKey(ni.PublicKey()))
if err != nil {
return nil, err
}
id := owner.NewID()
id.SetNeo3Wallet(w)
return id, nil
}
var transferAuditDetails = []byte("settlement-audit")
func (a auditSettlementDeps) Transfer(sender, recipient *owner.ID, amount *big.Int) {
if !amount.IsInt64() {
a.log.Error("amount can not be represented as an int64",
zap.Stringer("value", amount),
)
return
}
amount64 := amount.Int64()
if amount64 == 0 {
amount64 = 1
}
if err := a.balanceClient.TransferX(balanceClient.TransferPrm{
Amount: amount64,
From: sender,
To: recipient,
Details: transferAuditDetails,
}); err != nil {
a.log.Error("transfer of funds for audit failed",
zap.String("error", err.Error()),
)
}
}

View file

@ -16,10 +16,13 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap"
auditSettlement "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper"
balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager"
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
@ -51,6 +54,7 @@ type (
innerRingSize atomic.Int32
precision precision.Fixed8Converter
auditClient *auditWrapper.ClientWrapper
balanceClient *balanceWrapper.Wrapper
// internal variables
key *ecdsa.PrivateKey
@ -59,6 +63,8 @@ type (
predefinedValidators []keys.PublicKey
workers []func(context.Context)
auditSettlement *auditSettlement.Calculator
}
contracts struct {
@ -388,6 +394,48 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
return nil, err
}
cnrClient, err := invoke.NewNoFeeContainerClient(server.morphClient, server.contracts.container)
if err != nil {
return nil, err
}
nmClient, err := invoke.NewNoFeeNetmapClient(server.morphClient, server.contracts.netmap)
if err != nil {
return nil, err
}
balClient, err := invoke.NewNoFeeBalanceClient(server.morphClient, server.contracts.balance)
if err != nil {
return nil, err
}
auditCalcDeps := &auditSettlementDeps{
log: server.log,
cnrSrc: cnrClient,
auditClient: server.auditClient,
nmSrc: nmClient,
clientCache: clientCache,
balanceClient: balClient,
}
server.auditSettlement = auditSettlement.NewCalculator(
&auditSettlement.CalculatorPrm{
ResultStorage: auditCalcDeps,
ContainerStorage: auditCalcDeps,
PlacementCalculator: auditCalcDeps,
SGStorage: auditCalcDeps,
AccountStorage: auditCalcDeps,
Exchanger: auditCalcDeps,
},
auditSettlement.WithLogger(server.log),
)
server.subscribeNewEpoch(func(e netmapEvent.NewEpoch) {
server.auditSettlement.Calculate(&auditSettlement.CalculatePrm{
Epoch: e.EpochNumber(),
})
})
// todo: create vivid id component
return server, nil
@ -569,3 +617,26 @@ func (s *Server) tickTimers() {
s.blockTimers[i].Tick()
}
}
func (s *Server) subscribeNewEpoch(f func(netmapEvent.NewEpoch)) {
hi := event.HandlerInfo{}
// TODO: replace and share
const newEpochNotification = "NewEpoch"
hi.SetType(event.TypeFromString(newEpochNotification))
hi.SetScriptHash(s.contracts.netmap)
hi.SetHandler(s.onlyActiveEventHandler(func(ev event.Event) {
f(ev.(netmapEvent.NewEpoch))
}))
s.morphListener.RegisterHandler(hi)
}
func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) {
if s.IsActive() {
f(ev)
}
}
}

View file

@ -7,10 +7,13 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/audit"
auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/balance"
balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper"
morphContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
morphNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/pkg/errors"
)
const readOnlyFee = 0
@ -54,3 +57,18 @@ func NewNoFeeAuditClient(cli *client.Client, contract util.Uint160) (*auditWrapp
return auditWrapper.WrapClient(audit.New(staticClient)), nil
}
// NewNoFeeBalanceClient creates wrapper to work with Balance contract.
func NewNoFeeBalanceClient(cli *client.Client, contract util.Uint160) (*balanceWrapper.Wrapper, error) {
staticClient, err := client.NewStatic(cli, contract, readOnlyFee)
if err != nil {
return nil, errors.Wrap(err, "could not create static client of Balance contract")
}
enhancedBalanceClient, err := balance.New(staticClient)
if err != nil {
return nil, errors.Wrap(err, "could not create Balance contract client")
}
return balanceWrapper.New(enhancedBalanceClient)
}

View file

@ -53,21 +53,21 @@ func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client
// GetSG polls the container from audit task to get the object by id.
// Returns storage groups structure from received object.
func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.StorageGroup, error) {
nodes, err := placement.BuildObjectPlacement( // shuffle nodes
task.NetworkMap(),
task.ContainerNodes(),
id,
)
if err != nil {
return nil, fmt.Errorf("can't build object placement: %w", err)
}
sgAddress := new(object.Address)
sgAddress.SetContainerID(task.ContainerID())
sgAddress.SetObjectID(id)
return c.getSG(task.AuditContext(), sgAddress, task.NetworkMap(), task.ContainerNodes())
}
func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netmap.Netmap, cn netmap.ContainerNodes) (*storagegroup.StorageGroup, error) {
nodes, err := placement.BuildObjectPlacement(nm, cn, addr.ObjectID())
if err != nil {
return nil, fmt.Errorf("can't build object placement: %w", err)
}
getParams := new(client.GetObjectParams)
getParams.WithAddress(sgAddress)
getParams.WithAddress(addr)
for _, node := range placement.FlattenNodes(nodes) {
addr, err := network.IPAddrFromMultiaddr(node.Address())
@ -88,7 +88,7 @@ func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.Stor
continue
}
cctx, cancel := context.WithTimeout(task.AuditContext(), c.sgTimeout)
cctx, cancel := context.WithTimeout(ctx, c.sgTimeout)
obj, err := cli.GetObject(cctx, getParams)
cancel()