From 3d3d058b05c6f996511672d566ce455452d23b71 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 18 Dec 2020 15:52:27 +0300 Subject: [PATCH] [#265] innerring: Select containers to audit Signed-off-by: Alex Vanin --- pkg/innerring/innerring.go | 4 +- pkg/innerring/invoke/container.go | 51 ++++++++ pkg/innerring/invoke/neofs.go | 19 +-- pkg/innerring/processors/audit/process.go | 20 +++- pkg/innerring/processors/audit/processor.go | 1 + pkg/innerring/processors/audit/scheduler.go | 60 ++++++++++ .../processors/audit/scheduler_test.go | 113 ++++++++++++++++++ pkg/innerring/state.go | 6 + 8 files changed, 263 insertions(+), 11 deletions(-) create mode 100644 pkg/innerring/processors/audit/scheduler.go create mode 100644 pkg/innerring/processors/audit/scheduler_test.go diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index be4bc262..06310dae 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -42,6 +42,7 @@ type ( mainnetClient *client.Client epochCounter atomic.Uint64 innerRingIndex atomic.Int32 + innerRingSize atomic.Int32 precision precision.Fixed8Converter // internal variables @@ -440,7 +441,7 @@ func (s *Server) initConfigFromBlockchain() error { key := &s.key.PublicKey // check if node inside inner ring list and what index it has - index, err := invoke.InnerRingIndex(s.mainnetClient, s.contracts.neofs, key) + index, size, err := invoke.InnerRingIndex(s.mainnetClient, s.contracts.neofs, key) if err != nil { return errors.Wrap(err, "can't read inner ring list") } @@ -452,6 +453,7 @@ func (s *Server) initConfigFromBlockchain() error { } s.epochCounter.Store(uint64(epoch)) + s.innerRingSize.Store(size) s.innerRingIndex.Store(index) s.precision.SetBalancePrecision(balancePrecision) diff --git a/pkg/innerring/invoke/container.go b/pkg/innerring/invoke/container.go index ff3b43f9..770c8698 100644 --- a/pkg/innerring/invoke/container.go +++ b/pkg/innerring/invoke/container.go @@ -1,9 +1,13 @@ package invoke import ( + "crypto/sha256" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-node/pkg/morph/client" + "github.com/pkg/errors" ) type ( @@ -21,9 +25,12 @@ type ( } ) +var ErrParseTestInvoke = errors.New("can't parse NEO node response") + const ( putContainerMethod = "put" deleteContainerMethod = "delete" + listContainersMethod = "list" ) // RegisterContainer invokes Put method. @@ -50,3 +57,47 @@ func RemoveContainer(cli *client.Client, con util.Uint160, p *RemoveContainerPar p.Signature, ) } + +func ListContainers(cli *client.Client, con util.Uint160) ([]*container.ID, error) { + if cli == nil { + return nil, client.ErrNilClient + } + + item, err := cli.TestInvoke(con, listContainersMethod, []byte{}) + if err != nil { + return nil, err + } + + if len(item) < 1 { + return nil, errors.Wrap(ErrParseTestInvoke, "nested array expected") + } + + rawIDs, err := client.ArrayFromStackItem(item[0]) + if err != nil { + return nil, err + } + + result := make([]*container.ID, 0, len(rawIDs)) + + var bufHash [sha256.Size]byte + + for i := range rawIDs { + cid, err := client.BytesFromStackItem(rawIDs[i]) + if err != nil { + return nil, err + } + + if len(cid) != sha256.Size { + return nil, errors.Wrap(ErrParseTestInvoke, "invalid container ID size") + } + + copy(bufHash[:], cid) + + containerID := container.NewID() + containerID.SetSHA256(bufHash) + + result = append(result, containerID) + } + + return result, nil +} diff --git a/pkg/innerring/invoke/neofs.go b/pkg/innerring/invoke/neofs.go index e7ad54dc..b33a040a 100644 --- a/pkg/innerring/invoke/neofs.go +++ b/pkg/innerring/invoke/neofs.go @@ -74,23 +74,24 @@ func CashOutCheque(cli *client.Client, con util.Uint160, p *ChequeParams) error ) } -// InnerRingIndex returns index of the `key` in the inner ring list from sidechain. -// If key is not in the inner ring list, then returns `-1`. -func InnerRingIndex(cli *client.Client, con util.Uint160, key *ecdsa.PublicKey) (int32, error) { +// InnerRingIndex returns index of the `key` in the inner ring list from sidechain +// along with total size of inner ring list. If key is not in the inner ring list, +// then returns `-1` as index. +func InnerRingIndex(cli *client.Client, con util.Uint160, key *ecdsa.PublicKey) (int32, int32, error) { if cli == nil { - return 0, client.ErrNilClient + return 0, 0, client.ErrNilClient } nodePublicKey := crypto.MarshalPublicKey(key) data, err := cli.TestInvoke(con, innerRingListMethod) if err != nil { - return 0, err + return 0, 0, err } irNodes, err := client.ArrayFromStackItem(data[0]) if err != nil { - return 0, err + return 0, 0, err } var result int32 = -1 @@ -98,12 +99,12 @@ func InnerRingIndex(cli *client.Client, con util.Uint160, key *ecdsa.PublicKey) for i := range irNodes { key, err := client.ArrayFromStackItem(irNodes[i]) if err != nil { - return 0, err + return 0, 0, err } keyValue, err := client.BytesFromStackItem(key[0]) if err != nil { - return 0, err + return 0, 0, err } if bytes.Equal(keyValue, nodePublicKey) { @@ -112,5 +113,5 @@ func InnerRingIndex(cli *client.Client, con util.Uint160, key *ecdsa.PublicKey) } } - return result, nil + return result, int32(len(irNodes)), nil } diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index c1bbcdac..b2f1e5c4 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -1,5 +1,23 @@ package audit +import ( + "go.uber.org/zap" +) + func (ap *Processor) processStartAudit(epoch uint64) { - ap.log.Info("flushing left audit results and refilling queue") + // todo: flush left audit results from audit result cache + + containers, err := ap.selectContainersToAudit(epoch) + if err != nil { + ap.log.Error("container selection failure", + zap.String("error", err.Error())) + + return + } + + ap.log.Info("select containers for audit", zap.Int("amount", len(containers))) + + // todo: for each container get list of storage groups + // todo: for each container create audit result template in audit result cache + // todo: for each container push audit tasks into queue } diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index 32ad06d8..1ec32d5c 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -13,6 +13,7 @@ type ( // Indexer is a callback interface for inner ring global state. Indexer interface { Index() int32 + InnerRingSize() int32 } // Processor of events related with data audit. diff --git a/pkg/innerring/processors/audit/scheduler.go b/pkg/innerring/processors/audit/scheduler.go new file mode 100644 index 00000000..20a0fa05 --- /dev/null +++ b/pkg/innerring/processors/audit/scheduler.go @@ -0,0 +1,60 @@ +package audit + +import ( + "sort" + "strings" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" + "github.com/pkg/errors" +) + +var ErrInvalidIRNode = errors.New("node is not in the inner ring list") + +func (ap *Processor) selectContainersToAudit(epoch uint64) ([]*container.ID, error) { + containers, err := invoke.ListContainers(ap.morphClient, ap.containerContract) + if err != nil { + return nil, errors.Wrap(err, "can't get list of containers to start audit") + } + + // consider getting extra information about container complexity from + // audit contract there + + sort.Slice(containers, func(i, j int) bool { + return strings.Compare(containers[i].String(), containers[j].String()) < 0 + }) + + ind := ap.irList.Index() + irSize := ap.irList.InnerRingSize() + + if ind < 0 || ind >= irSize { + return nil, ErrInvalidIRNode + } + + return Select(containers, epoch, uint64(ind), uint64(irSize)), nil +} + +func Select(ids []*container.ID, epoch, index, size uint64) []*container.ID { + if index >= size { + return nil + } + + var a, b uint64 + + ln := uint64(len(ids)) + pivot := ln % size + delta := ln / size + + index = (index + epoch) % size + if index < pivot { + a = delta + 1 + } else { + a = delta + b = pivot + } + + from := a*index + b + to := a*(index+1) + b + + return ids[from:to] +} diff --git a/pkg/innerring/processors/audit/scheduler_test.go b/pkg/innerring/processors/audit/scheduler_test.go new file mode 100644 index 00000000..61a9848a --- /dev/null +++ b/pkg/innerring/processors/audit/scheduler_test.go @@ -0,0 +1,113 @@ +package audit_test + +import ( + "crypto/rand" + "crypto/sha256" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" + "github.com/stretchr/testify/require" +) + +func TestSelect(t *testing.T) { + cids := generateContainers(10) + + t.Run("invalid input", func(t *testing.T) { + require.Empty(t, audit.Select(cids, 0, 0, 0)) + }) + + t.Run("even split", func(t *testing.T) { + const irSize = 5 // every node takes two audit nodes + + m := hitMap(cids) + + for i := 0; i < irSize; i++ { + s := audit.Select(cids, 0, uint64(i), irSize) + require.Equal(t, len(cids)/irSize, len(s)) + + for _, id := range s { + n, ok := m[id.String()] + require.True(t, ok) + require.Equal(t, 0, n) + m[id.String()] = 1 + } + } + + require.True(t, allHit(m)) + }) + + t.Run("odd split", func(t *testing.T) { + const irSize = 3 + + m := hitMap(cids) + + for i := 0; i < irSize; i++ { + s := audit.Select(cids, 0, uint64(i), irSize) + + for _, id := range s { + n, ok := m[id.String()] + require.True(t, ok) + require.Equal(t, 0, n) + m[id.String()] = 1 + } + } + + require.True(t, allHit(m)) + }) + + t.Run("epoch shift", func(t *testing.T) { + const irSize = 4 + + m := hitMap(cids) + + for i := 0; i < irSize; i++ { + s := audit.Select(cids, uint64(i), 0, irSize) + + for _, id := range s { + n, ok := m[id.String()] + require.True(t, ok) + require.Equal(t, 0, n) + m[id.String()] = 1 + } + } + + require.True(t, allHit(m)) + }) +} + +func generateContainers(n int) []*container.ID { + var buf [sha256.Size]byte + result := make([]*container.ID, 0, n) + + for i := 0; i < n; i++ { + _, _ = rand.Read(buf[:]) + + cid := container.NewID() + cid.SetSHA256(buf) + + result = append(result, cid) + } + + return result +} + +func hitMap(ids []*container.ID) map[string]int { + result := make(map[string]int, len(ids)) + + for _, id := range ids { + result[id.String()] = 0 + } + + return result +} + +func allHit(m map[string]int) bool { + for _, v := range m { + if v == 0 { + return false + } + } + + return true +} diff --git a/pkg/innerring/state.go b/pkg/innerring/state.go index 9e4453bf..ec0e333e 100644 --- a/pkg/innerring/state.go +++ b/pkg/innerring/state.go @@ -28,6 +28,12 @@ func (s *Server) Index() int32 { return s.innerRingIndex.Load() } +// InnerRingSize is a getter for a global size of inner ring list. This value +// paired with inner ring index. +func (s *Server) InnerRingSize() int32 { + return s.innerRingSize.Load() +} + func (s *Server) voteForSidechainValidator(validators []keys.PublicKey) error { index := s.Index() if index < 0 || index >= alphabetContractsN {