From 87c2c3ecc6861419a55f8150124984018c6f1e98 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 26 Jan 2021 13:33:28 +0300 Subject: [PATCH] [#326] ir: Calculate audit settlements on new epoch Calculate payments to storage nodes for the passed audit when changing the epoch. The calculation results are wrapped in a call to the Balance contract (one transaction per user-to-user transfer). Signed-off-by: Leonard Lyubich --- pkg/innerring/audit.go | 193 +++++++++++++++++++++++++++++++ pkg/innerring/innerring.go | 71 ++++++++++++ pkg/innerring/invoke/enhanced.go | 18 +++ pkg/innerring/rpc.go | 22 ++-- 4 files changed, 293 insertions(+), 11 deletions(-) create mode 100644 pkg/innerring/audit.go diff --git a/pkg/innerring/audit.go b/pkg/innerring/audit.go new file mode 100644 index 000000000..f277b0270 --- /dev/null +++ b/pkg/innerring/audit.go @@ -0,0 +1,193 @@ +package innerring + +import ( + "context" + "math/big" + + auditAPI "github.com/nspcc-dev/neofs-api-go/pkg/audit" + containerAPI "github.com/nspcc-dev/neofs-api-go/pkg/container" + netmapAPI "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-api-go/pkg/storagegroup" + crypto "github.com/nspcc-dev/neofs-crypto" + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit" + auditClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper" + balanceClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type auditSettlementDeps struct { + log *logger.Logger + + cnrSrc container.Source + + auditClient *auditClient.ClientWrapper + + nmSrc netmap.Source + + clientCache *ClientCache + + balanceClient *balanceClient.Wrapper +} + +type containerWrapper containerAPI.Container + +type nodeInfoWrapper struct { + ni *netmapAPI.Node +} + +type sgWrapper storagegroup.StorageGroup + +func (s *sgWrapper) Size() uint64 { + return (*storagegroup.StorageGroup)(s).ValidationDataSize() +} + +func (n nodeInfoWrapper) PublicKey() []byte { + return n.ni.PublicKey() +} + +func (n nodeInfoWrapper) Price() *big.Int { + return big.NewInt(int64(n.ni.Price)) +} + +func (c *containerWrapper) Owner() *owner.ID { + return (*containerAPI.Container)(c).OwnerID() +} + +func (a auditSettlementDeps) AuditResultsForEpoch(epoch uint64) ([]*auditAPI.Result, error) { + idList, err := a.auditClient.ListAuditResultIDByEpoch(epoch) + if err != nil { + return nil, errors.Wrap(err, "could not list audit results in sidechain") + } + + res := make([]*auditAPI.Result, 0, len(idList)) + + for i := range idList { + r, err := a.auditClient.GetAuditResult(idList[i]) + if err != nil { + return nil, errors.Wrap(err, "could not get audit result") + } + + res = append(res, r) + } + + return res, nil +} + +func (a auditSettlementDeps) ContainerInfo(cid *containerAPI.ID) (audit.ContainerInfo, error) { + cnr, err := a.cnrSrc.Get(cid) + if err != nil { + return nil, errors.Wrap(err, "could not get container from storage") + } + + return (*containerWrapper)(cnr), nil +} + +func (a auditSettlementDeps) buildContainer(e uint64, cid *containerAPI.ID) (netmapAPI.ContainerNodes, *netmapAPI.Netmap, error) { + var ( + nm *netmapAPI.Netmap + err error + ) + + if e > 0 { + nm, err = a.nmSrc.GetNetMapByEpoch(e) + } else { + nm, err = netmap.GetLatestNetworkMap(a.nmSrc) + } + + if err != nil { + return nil, nil, errors.Wrap(err, "could not get network map from storage") + } + + cnr, err := a.cnrSrc.Get(cid) + if err != nil { + return nil, nil, errors.Wrap(err, "could not get container from sidechain") + } + + cn, err := nm.GetContainerNodes( + cnr.PlacementPolicy(), + cid.ToV2().GetValue(), // may be replace pivot calculation to neofs-api-go + ) + if err != nil { + return nil, nil, errors.Wrap(err, "could not calculate container nodes") + } + + return cn, nm, nil +} + +func (a auditSettlementDeps) ContainerNodes(e uint64, cid *containerAPI.ID) ([]audit.NodeInfo, error) { + cn, _, err := a.buildContainer(e, cid) + if err != nil { + return nil, err + } + + ns := cn.Flatten() + res := make([]audit.NodeInfo, 0, len(ns)) + + for i := range ns { + res = append(res, &nodeInfoWrapper{ + ni: ns[i], + }) + } + + return res, nil +} + +func (a auditSettlementDeps) SGInfo(addr *object.Address) (audit.SGInfo, error) { + cn, nm, err := a.buildContainer(0, addr.ContainerID()) + if err != nil { + return nil, err + } + + sg, err := a.clientCache.getSG(context.Background(), addr, nm, cn) + if err != nil { + return nil, err + } + + return (*sgWrapper)(sg), nil +} + +func (a auditSettlementDeps) ResolveKey(ni audit.NodeInfo) (*owner.ID, error) { + w, err := owner.NEO3WalletFromPublicKey(crypto.UnmarshalPublicKey(ni.PublicKey())) + if err != nil { + return nil, err + } + + id := owner.NewID() + id.SetNeo3Wallet(w) + + return id, nil +} + +var transferAuditDetails = []byte("settlement-audit") + +func (a auditSettlementDeps) Transfer(sender, recipient *owner.ID, amount *big.Int) { + if !amount.IsInt64() { + a.log.Error("amount can not be represented as an int64", + zap.Stringer("value", amount), + ) + + return + } + + amount64 := amount.Int64() + if amount64 == 0 { + amount64 = 1 + } + + if err := a.balanceClient.TransferX(balanceClient.TransferPrm{ + Amount: amount64, + From: sender, + To: recipient, + Details: transferAuditDetails, + }); err != nil { + a.log.Error("transfer of funds for audit failed", + zap.String("error", err.Error()), + ) + } +} diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index cfc9acda8..213bfd3c9 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -16,10 +16,13 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap" + auditSettlement "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/morph/client" auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper" + balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" + netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager" util2 "github.com/nspcc-dev/neofs-node/pkg/util" @@ -51,6 +54,7 @@ type ( innerRingSize atomic.Int32 precision precision.Fixed8Converter auditClient *auditWrapper.ClientWrapper + balanceClient *balanceWrapper.Wrapper // internal variables key *ecdsa.PrivateKey @@ -59,6 +63,8 @@ type ( predefinedValidators []keys.PublicKey workers []func(context.Context) + + auditSettlement *auditSettlement.Calculator } contracts struct { @@ -388,6 +394,48 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } + cnrClient, err := invoke.NewNoFeeContainerClient(server.morphClient, server.contracts.container) + if err != nil { + return nil, err + } + + nmClient, err := invoke.NewNoFeeNetmapClient(server.morphClient, server.contracts.netmap) + if err != nil { + return nil, err + } + + balClient, err := invoke.NewNoFeeBalanceClient(server.morphClient, server.contracts.balance) + if err != nil { + return nil, err + } + + auditCalcDeps := &auditSettlementDeps{ + log: server.log, + cnrSrc: cnrClient, + auditClient: server.auditClient, + nmSrc: nmClient, + clientCache: clientCache, + balanceClient: balClient, + } + + server.auditSettlement = auditSettlement.NewCalculator( + &auditSettlement.CalculatorPrm{ + ResultStorage: auditCalcDeps, + ContainerStorage: auditCalcDeps, + PlacementCalculator: auditCalcDeps, + SGStorage: auditCalcDeps, + AccountStorage: auditCalcDeps, + Exchanger: auditCalcDeps, + }, + auditSettlement.WithLogger(server.log), + ) + + server.subscribeNewEpoch(func(e netmapEvent.NewEpoch) { + server.auditSettlement.Calculate(&auditSettlement.CalculatePrm{ + Epoch: e.EpochNumber(), + }) + }) + // todo: create vivid id component return server, nil @@ -569,3 +617,26 @@ func (s *Server) tickTimers() { s.blockTimers[i].Tick() } } + +func (s *Server) subscribeNewEpoch(f func(netmapEvent.NewEpoch)) { + hi := event.HandlerInfo{} + + // TODO: replace and share + const newEpochNotification = "NewEpoch" + + hi.SetType(event.TypeFromString(newEpochNotification)) + hi.SetScriptHash(s.contracts.netmap) + hi.SetHandler(s.onlyActiveEventHandler(func(ev event.Event) { + f(ev.(netmapEvent.NewEpoch)) + })) + + s.morphListener.RegisterHandler(hi) +} + +func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler { + return func(ev event.Event) { + if s.IsActive() { + f(ev) + } + } +} diff --git a/pkg/innerring/invoke/enhanced.go b/pkg/innerring/invoke/enhanced.go index 20dc7764b..b89d55bd3 100644 --- a/pkg/innerring/invoke/enhanced.go +++ b/pkg/innerring/invoke/enhanced.go @@ -7,10 +7,13 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit" auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper" + "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance" + balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper" morphContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" morphNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" + "github.com/pkg/errors" ) const readOnlyFee = 0 @@ -54,3 +57,18 @@ func NewNoFeeAuditClient(cli *client.Client, contract util.Uint160) (*auditWrapp return auditWrapper.WrapClient(audit.New(staticClient)), nil } + +// NewNoFeeBalanceClient creates wrapper to work with Balance contract. +func NewNoFeeBalanceClient(cli *client.Client, contract util.Uint160) (*balanceWrapper.Wrapper, error) { + staticClient, err := client.NewStatic(cli, contract, readOnlyFee) + if err != nil { + return nil, errors.Wrap(err, "could not create static client of Balance contract") + } + + enhancedBalanceClient, err := balance.New(staticClient) + if err != nil { + return nil, errors.Wrap(err, "could not create Balance contract client") + } + + return balanceWrapper.New(enhancedBalanceClient) +} diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index dbf4d54e4..756dc82b0 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -53,21 +53,21 @@ func (c *ClientCache) Get(address string, opts ...client.Option) (*client.Client // GetSG polls the container from audit task to get the object by id. // Returns storage groups structure from received object. func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.StorageGroup, error) { - nodes, err := placement.BuildObjectPlacement( // shuffle nodes - task.NetworkMap(), - task.ContainerNodes(), - id, - ) - if err != nil { - return nil, fmt.Errorf("can't build object placement: %w", err) - } - sgAddress := new(object.Address) sgAddress.SetContainerID(task.ContainerID()) sgAddress.SetObjectID(id) + return c.getSG(task.AuditContext(), sgAddress, task.NetworkMap(), task.ContainerNodes()) +} + +func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netmap.Netmap, cn netmap.ContainerNodes) (*storagegroup.StorageGroup, error) { + nodes, err := placement.BuildObjectPlacement(nm, cn, addr.ObjectID()) + if err != nil { + return nil, fmt.Errorf("can't build object placement: %w", err) + } + getParams := new(client.GetObjectParams) - getParams.WithAddress(sgAddress) + getParams.WithAddress(addr) for _, node := range placement.FlattenNodes(nodes) { addr, err := network.IPAddrFromMultiaddr(node.Address()) @@ -88,7 +88,7 @@ func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.Stor continue } - cctx, cancel := context.WithTimeout(task.AuditContext(), c.sgTimeout) + cctx, cancel := context.WithTimeout(ctx, c.sgTimeout) obj, err := cli.GetObject(cctx, getParams) cancel()