forked from TrueCloudLab/frostfs-node
[#265] innerring: Select containers to audit
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
87e1252065
commit
3d3d058b05
8 changed files with 263 additions and 11 deletions
|
@ -42,6 +42,7 @@ type (
|
|||
mainnetClient *client.Client
|
||||
epochCounter atomic.Uint64
|
||||
innerRingIndex atomic.Int32
|
||||
innerRingSize atomic.Int32
|
||||
precision precision.Fixed8Converter
|
||||
|
||||
// internal variables
|
||||
|
@ -440,7 +441,7 @@ func (s *Server) initConfigFromBlockchain() error {
|
|||
key := &s.key.PublicKey
|
||||
|
||||
// check if node inside inner ring list and what index it has
|
||||
index, err := invoke.InnerRingIndex(s.mainnetClient, s.contracts.neofs, key)
|
||||
index, size, err := invoke.InnerRingIndex(s.mainnetClient, s.contracts.neofs, key)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't read inner ring list")
|
||||
}
|
||||
|
@ -452,6 +453,7 @@ func (s *Server) initConfigFromBlockchain() error {
|
|||
}
|
||||
|
||||
s.epochCounter.Store(uint64(epoch))
|
||||
s.innerRingSize.Store(size)
|
||||
s.innerRingIndex.Store(index)
|
||||
s.precision.SetBalancePrecision(balancePrecision)
|
||||
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
package invoke
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -21,9 +25,12 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
var ErrParseTestInvoke = errors.New("can't parse NEO node response")
|
||||
|
||||
const (
|
||||
putContainerMethod = "put"
|
||||
deleteContainerMethod = "delete"
|
||||
listContainersMethod = "list"
|
||||
)
|
||||
|
||||
// RegisterContainer invokes Put method.
|
||||
|
@ -50,3 +57,47 @@ func RemoveContainer(cli *client.Client, con util.Uint160, p *RemoveContainerPar
|
|||
p.Signature,
|
||||
)
|
||||
}
|
||||
|
||||
func ListContainers(cli *client.Client, con util.Uint160) ([]*container.ID, error) {
|
||||
if cli == nil {
|
||||
return nil, client.ErrNilClient
|
||||
}
|
||||
|
||||
item, err := cli.TestInvoke(con, listContainersMethod, []byte{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(item) < 1 {
|
||||
return nil, errors.Wrap(ErrParseTestInvoke, "nested array expected")
|
||||
}
|
||||
|
||||
rawIDs, err := client.ArrayFromStackItem(item[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]*container.ID, 0, len(rawIDs))
|
||||
|
||||
var bufHash [sha256.Size]byte
|
||||
|
||||
for i := range rawIDs {
|
||||
cid, err := client.BytesFromStackItem(rawIDs[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(cid) != sha256.Size {
|
||||
return nil, errors.Wrap(ErrParseTestInvoke, "invalid container ID size")
|
||||
}
|
||||
|
||||
copy(bufHash[:], cid)
|
||||
|
||||
containerID := container.NewID()
|
||||
containerID.SetSHA256(bufHash)
|
||||
|
||||
result = append(result, containerID)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
|
@ -74,23 +74,24 @@ func CashOutCheque(cli *client.Client, con util.Uint160, p *ChequeParams) error
|
|||
)
|
||||
}
|
||||
|
||||
// InnerRingIndex returns index of the `key` in the inner ring list from sidechain.
|
||||
// If key is not in the inner ring list, then returns `-1`.
|
||||
func InnerRingIndex(cli *client.Client, con util.Uint160, key *ecdsa.PublicKey) (int32, error) {
|
||||
// InnerRingIndex returns index of the `key` in the inner ring list from sidechain
|
||||
// along with total size of inner ring list. If key is not in the inner ring list,
|
||||
// then returns `-1` as index.
|
||||
func InnerRingIndex(cli *client.Client, con util.Uint160, key *ecdsa.PublicKey) (int32, int32, error) {
|
||||
if cli == nil {
|
||||
return 0, client.ErrNilClient
|
||||
return 0, 0, client.ErrNilClient
|
||||
}
|
||||
|
||||
nodePublicKey := crypto.MarshalPublicKey(key)
|
||||
|
||||
data, err := cli.TestInvoke(con, innerRingListMethod)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
irNodes, err := client.ArrayFromStackItem(data[0])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
var result int32 = -1
|
||||
|
@ -98,12 +99,12 @@ func InnerRingIndex(cli *client.Client, con util.Uint160, key *ecdsa.PublicKey)
|
|||
for i := range irNodes {
|
||||
key, err := client.ArrayFromStackItem(irNodes[i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
keyValue, err := client.BytesFromStackItem(key[0])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
if bytes.Equal(keyValue, nodePublicKey) {
|
||||
|
@ -112,5 +113,5 @@ func InnerRingIndex(cli *client.Client, con util.Uint160, key *ecdsa.PublicKey)
|
|||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return result, int32(len(irNodes)), nil
|
||||
}
|
||||
|
|
|
@ -1,5 +1,23 @@
|
|||
package audit
|
||||
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (ap *Processor) processStartAudit(epoch uint64) {
|
||||
ap.log.Info("flushing left audit results and refilling queue")
|
||||
// todo: flush left audit results from audit result cache
|
||||
|
||||
containers, err := ap.selectContainersToAudit(epoch)
|
||||
if err != nil {
|
||||
ap.log.Error("container selection failure",
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ap.log.Info("select containers for audit", zap.Int("amount", len(containers)))
|
||||
|
||||
// todo: for each container get list of storage groups
|
||||
// todo: for each container create audit result template in audit result cache
|
||||
// todo: for each container push audit tasks into queue
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ type (
|
|||
// Indexer is a callback interface for inner ring global state.
|
||||
Indexer interface {
|
||||
Index() int32
|
||||
InnerRingSize() int32
|
||||
}
|
||||
|
||||
// Processor of events related with data audit.
|
||||
|
|
60
pkg/innerring/processors/audit/scheduler.go
Normal file
60
pkg/innerring/processors/audit/scheduler.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package audit
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var ErrInvalidIRNode = errors.New("node is not in the inner ring list")
|
||||
|
||||
func (ap *Processor) selectContainersToAudit(epoch uint64) ([]*container.ID, error) {
|
||||
containers, err := invoke.ListContainers(ap.morphClient, ap.containerContract)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't get list of containers to start audit")
|
||||
}
|
||||
|
||||
// consider getting extra information about container complexity from
|
||||
// audit contract there
|
||||
|
||||
sort.Slice(containers, func(i, j int) bool {
|
||||
return strings.Compare(containers[i].String(), containers[j].String()) < 0
|
||||
})
|
||||
|
||||
ind := ap.irList.Index()
|
||||
irSize := ap.irList.InnerRingSize()
|
||||
|
||||
if ind < 0 || ind >= irSize {
|
||||
return nil, ErrInvalidIRNode
|
||||
}
|
||||
|
||||
return Select(containers, epoch, uint64(ind), uint64(irSize)), nil
|
||||
}
|
||||
|
||||
func Select(ids []*container.ID, epoch, index, size uint64) []*container.ID {
|
||||
if index >= size {
|
||||
return nil
|
||||
}
|
||||
|
||||
var a, b uint64
|
||||
|
||||
ln := uint64(len(ids))
|
||||
pivot := ln % size
|
||||
delta := ln / size
|
||||
|
||||
index = (index + epoch) % size
|
||||
if index < pivot {
|
||||
a = delta + 1
|
||||
} else {
|
||||
a = delta
|
||||
b = pivot
|
||||
}
|
||||
|
||||
from := a*index + b
|
||||
to := a*(index+1) + b
|
||||
|
||||
return ids[from:to]
|
||||
}
|
113
pkg/innerring/processors/audit/scheduler_test.go
Normal file
113
pkg/innerring/processors/audit/scheduler_test.go
Normal file
|
@ -0,0 +1,113 @@
|
|||
package audit_test
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSelect(t *testing.T) {
|
||||
cids := generateContainers(10)
|
||||
|
||||
t.Run("invalid input", func(t *testing.T) {
|
||||
require.Empty(t, audit.Select(cids, 0, 0, 0))
|
||||
})
|
||||
|
||||
t.Run("even split", func(t *testing.T) {
|
||||
const irSize = 5 // every node takes two audit nodes
|
||||
|
||||
m := hitMap(cids)
|
||||
|
||||
for i := 0; i < irSize; i++ {
|
||||
s := audit.Select(cids, 0, uint64(i), irSize)
|
||||
require.Equal(t, len(cids)/irSize, len(s))
|
||||
|
||||
for _, id := range s {
|
||||
n, ok := m[id.String()]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 0, n)
|
||||
m[id.String()] = 1
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, allHit(m))
|
||||
})
|
||||
|
||||
t.Run("odd split", func(t *testing.T) {
|
||||
const irSize = 3
|
||||
|
||||
m := hitMap(cids)
|
||||
|
||||
for i := 0; i < irSize; i++ {
|
||||
s := audit.Select(cids, 0, uint64(i), irSize)
|
||||
|
||||
for _, id := range s {
|
||||
n, ok := m[id.String()]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 0, n)
|
||||
m[id.String()] = 1
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, allHit(m))
|
||||
})
|
||||
|
||||
t.Run("epoch shift", func(t *testing.T) {
|
||||
const irSize = 4
|
||||
|
||||
m := hitMap(cids)
|
||||
|
||||
for i := 0; i < irSize; i++ {
|
||||
s := audit.Select(cids, uint64(i), 0, irSize)
|
||||
|
||||
for _, id := range s {
|
||||
n, ok := m[id.String()]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 0, n)
|
||||
m[id.String()] = 1
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, allHit(m))
|
||||
})
|
||||
}
|
||||
|
||||
func generateContainers(n int) []*container.ID {
|
||||
var buf [sha256.Size]byte
|
||||
result := make([]*container.ID, 0, n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
_, _ = rand.Read(buf[:])
|
||||
|
||||
cid := container.NewID()
|
||||
cid.SetSHA256(buf)
|
||||
|
||||
result = append(result, cid)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func hitMap(ids []*container.ID) map[string]int {
|
||||
result := make(map[string]int, len(ids))
|
||||
|
||||
for _, id := range ids {
|
||||
result[id.String()] = 0
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func allHit(m map[string]int) bool {
|
||||
for _, v := range m {
|
||||
if v == 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
|
@ -28,6 +28,12 @@ func (s *Server) Index() int32 {
|
|||
return s.innerRingIndex.Load()
|
||||
}
|
||||
|
||||
// InnerRingSize is a getter for a global size of inner ring list. This value
|
||||
// paired with inner ring index.
|
||||
func (s *Server) InnerRingSize() int32 {
|
||||
return s.innerRingSize.Load()
|
||||
}
|
||||
|
||||
func (s *Server) voteForSidechainValidator(validators []keys.PublicKey) error {
|
||||
index := s.Index()
|
||||
if index < 0 || index >= alphabetContractsN {
|
||||
|
|
Loading…
Reference in a new issue