diff --git a/cmd/frostfs-adm/internal/modules/morph/balance.go b/cmd/frostfs-adm/internal/modules/morph/balance.go index ca9d939f0..f97250c38 100644 --- a/cmd/frostfs-adm/internal/modules/morph/balance.go +++ b/cmd/frostfs-adm/internal/modules/morph/balance.go @@ -44,7 +44,6 @@ const ( notaryEnabled = true ) -// nolint: funlen, gocognit func dumpBalances(cmd *cobra.Command, _ []string) error { var ( dumpStorage, _ = cmd.Flags().GetBool(dumpBalancesStorageFlag) @@ -84,86 +83,110 @@ func dumpBalances(cmd *cobra.Command, _ []string) error { printBalances(cmd, "Inner ring nodes balances:", irList) if dumpStorage { - arr, err := unwrap.Array(inv.Call(nmHash, "netmap")) - if err != nil { - return errors.New("can't fetch the list of storage nodes") - } - - snList := make([]accBalancePair, len(arr)) - for i := range arr { - node, ok := arr[i].Value().([]stackitem.Item) - if !ok || len(node) == 0 { - return errors.New("can't parse the list of storage nodes") - } - bs, err := node[0].TryBytes() - if err != nil { - return errors.New("can't parse the list of storage nodes") - } - var ni netmap.NodeInfo - if err := ni.Unmarshal(bs); err != nil { - return fmt.Errorf("can't parse the list of storage nodes: %w", err) - } - pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256()) - if err != nil { - return fmt.Errorf("can't parse storage node public key: %w", err) - } - snList[i].scriptHash = pub.GetScriptHash() - } - - if err := fetchBalances(inv, gas.Hash, snList); err != nil { + if err := printStorageNodeBalances(cmd, inv, nmHash); err != nil { return err } - printBalances(cmd, "\nStorage node balances:", snList) } if dumpProxy { - h, err := nnsResolveHash(inv, nnsCs.Hash, proxyContract+".frostfs") - if err != nil { - return fmt.Errorf("can't get hash of the proxy contract: %w", err) - } - - proxyList := []accBalancePair{{scriptHash: h}} - if err := fetchBalances(inv, gas.Hash, proxyList); err != nil { + if err := printProxyContractBalance(cmd, inv, nnsCs.Hash); err != nil { return err } - printBalances(cmd, "\nProxy contract balance:", proxyList) } if dumpAlphabet { - alphaList := make([]accBalancePair, len(irList)) - - w := io.NewBufBinWriter() - for i := range alphaList { - emit.AppCall(w.BinWriter, nnsCs.Hash, "resolve", callflag.ReadOnly, - getAlphabetNNSDomain(i), - int64(nns.TXT)) - } - if w.Err != nil { - panic(w.Err) - } - - alphaRes, err := c.InvokeScript(w.Bytes(), nil) - if err != nil { - return fmt.Errorf("can't fetch info from NNS: %w", err) - } - - for i := range alphaList { - h, err := parseNNSResolveResult(alphaRes.Stack[i]) - if err != nil { - return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err) - } - alphaList[i].scriptHash = h - } - - if err := fetchBalances(inv, gas.Hash, alphaList); err != nil { + if err := printAlphabetContractBalances(cmd, c, inv, len(irList), nnsCs.Hash); err != nil { return err } - printBalances(cmd, "\nAlphabet contracts balances:", alphaList) } return nil } +func printStorageNodeBalances(cmd *cobra.Command, inv *invoker.Invoker, nmHash util.Uint160) error { + arr, err := unwrap.Array(inv.Call(nmHash, "netmap")) + if err != nil { + return errors.New("can't fetch the list of storage nodes") + } + + snList := make([]accBalancePair, len(arr)) + for i := range arr { + node, ok := arr[i].Value().([]stackitem.Item) + if !ok || len(node) == 0 { + return errors.New("can't parse the list of storage nodes") + } + bs, err := node[0].TryBytes() + if err != nil { + return errors.New("can't parse the list of storage nodes") + } + var ni netmap.NodeInfo + if err := ni.Unmarshal(bs); err != nil { + return fmt.Errorf("can't parse the list of storage nodes: %w", err) + } + pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256()) + if err != nil { + return fmt.Errorf("can't parse storage node public key: %w", err) + } + snList[i].scriptHash = pub.GetScriptHash() + } + + if err := fetchBalances(inv, gas.Hash, snList); err != nil { + return err + } + + printBalances(cmd, "\nStorage node balances:", snList) + return nil +} + +func printProxyContractBalance(cmd *cobra.Command, inv *invoker.Invoker, nnsHash util.Uint160) error { + h, err := nnsResolveHash(inv, nnsHash, proxyContract+".frostfs") + if err != nil { + return fmt.Errorf("can't get hash of the proxy contract: %w", err) + } + + proxyList := []accBalancePair{{scriptHash: h}} + if err := fetchBalances(inv, gas.Hash, proxyList); err != nil { + return err + } + + printBalances(cmd, "\nProxy contract balance:", proxyList) + return nil +} + +func printAlphabetContractBalances(cmd *cobra.Command, c Client, inv *invoker.Invoker, count int, nnsHash util.Uint160) error { + alphaList := make([]accBalancePair, count) + + w := io.NewBufBinWriter() + for i := range alphaList { + emit.AppCall(w.BinWriter, nnsHash, "resolve", callflag.ReadOnly, + getAlphabetNNSDomain(i), + int64(nns.TXT)) + } + if w.Err != nil { + panic(w.Err) + } + + alphaRes, err := c.InvokeScript(w.Bytes(), nil) + if err != nil { + return fmt.Errorf("can't fetch info from NNS: %w", err) + } + + for i := range alphaList { + h, err := parseNNSResolveResult(alphaRes.Stack[i]) + if err != nil { + return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err) + } + alphaList[i].scriptHash = h + } + + if err := fetchBalances(inv, gas.Hash, alphaList); err != nil { + return err + } + + printBalances(cmd, "\nAlphabet contracts balances:", alphaList) + return nil +} + func fetchIRNodes(c Client, nmHash, desigHash util.Uint160) ([]accBalancePair, error) { var irList []accBalancePair diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 0d6f49eea..b69ab4890 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -51,17 +51,17 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a } // TreeApply implements the pilorama.Forest interface. -func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error { - index, lst, err := e.getTreeShard(d.CID, treeID) +func (e *StorageEngine) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error { + index, lst, err := e.getTreeShard(cnr, treeID) if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { return err } - err = lst[index].TreeApply(d, treeID, m, backgroundSync) + err = lst[index].TreeApply(cnr, treeID, m, backgroundSync) if err != nil { if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled { e.reportShardError(lst[index], "can't perform `TreeApply`", err, - zap.Stringer("cid", d.CID), + zap.Stringer("cid", cnr), zap.String("tree", treeID)) } return err diff --git a/pkg/local_object_storage/metabase/lock.go b/pkg/local_object_storage/metabase/lock.go index 65f9e669d..2e6bed936 100644 --- a/pkg/local_object_storage/metabase/lock.go +++ b/pkg/local_object_storage/metabase/lock.go @@ -138,57 +138,57 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool { // Operation is very resource-intensive, which is caused by the admissibility // of multiple locks. Also, if we knew what objects are locked, it would be // possible to speed up the execution. -// -// nolint: gocognit func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error { bucketLocked := tx.Bucket(bucketNameLocked) - if bucketLocked != nil { - key := make([]byte, cidSize) - idCnr.Encode(key) + if bucketLocked == nil { + return nil + } - bucketLockedContainer := bucketLocked.Bucket(key) - if bucketLockedContainer != nil { - keyLocker := objectKey(locker, key) - return bucketLockedContainer.ForEach(func(k, v []byte) error { - keyLockers, err := decodeList(v) - if err != nil { - return fmt.Errorf("decode list of lockers in locked bucket: %w", err) - } + key := make([]byte, cidSize) + idCnr.Encode(key) - for i := range keyLockers { - if bytes.Equal(keyLockers[i], keyLocker) { - if len(keyLockers) == 1 { - // locker was all alone - err = bucketLockedContainer.Delete(k) - if err != nil { - return fmt.Errorf("delete locked object record from locked bucket: %w", err) - } - } else { - // exclude locker - keyLockers = append(keyLockers[:i], keyLockers[i+1:]...) + bucketLockedContainer := bucketLocked.Bucket(key) + if bucketLockedContainer == nil { + return nil + } - v, err = encodeList(keyLockers) - if err != nil { - return fmt.Errorf("encode updated list of lockers: %w", err) - } + keyLocker := objectKey(locker, key) + return bucketLockedContainer.ForEach(func(k, v []byte) error { + keyLockers, err := decodeList(v) + if err != nil { + return fmt.Errorf("decode list of lockers in locked bucket: %w", err) + } - // update the record - err = bucketLockedContainer.Put(k, v) - if err != nil { - return fmt.Errorf("update list of lockers: %w", err) - } - } + for i := range keyLockers { + if bytes.Equal(keyLockers[i], keyLocker) { + if len(keyLockers) == 1 { + // locker was all alone + err = bucketLockedContainer.Delete(k) + if err != nil { + return fmt.Errorf("delete locked object record from locked bucket: %w", err) + } + } else { + // exclude locker + keyLockers = append(keyLockers[:i], keyLockers[i+1:]...) - return nil + v, err = encodeList(keyLockers) + if err != nil { + return fmt.Errorf("encode updated list of lockers: %w", err) + } + + // update the record + err = bucketLockedContainer.Put(k, v) + if err != nil { + return fmt.Errorf("update list of lockers: %w", err) } } return nil - }) + } } - } - return nil + return nil + }) } // IsLockedPrm groups the parameters of IsLocked operation. diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index b47fa16e8..42dde8607 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -327,11 +327,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { } // TreeApply implements the Forest interface. -func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error { - if !d.checkValid() { - return ErrInvalidCIDDescriptor - } - +func (t *boltForest) TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error { t.modeMtx.RLock() defer t.modeMtx.RUnlock() @@ -344,7 +340,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou if backgroundSync { var seen bool err := t.db.View(func(tx *bbolt.Tx) error { - treeRoot := tx.Bucket(bucketName(d.CID, treeID)) + treeRoot := tx.Bucket(bucketName(cnr, treeID)) if treeRoot == nil { return nil } @@ -362,7 +358,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou } if t.db.MaxBatchSize == 1 { - fullID := bucketName(d.CID, treeID) + fullID := bucketName(cnr, treeID) return t.db.Update(func(tx *bbolt.Tx) error { bLog, bTree, err := t.getTreeBuckets(tx, fullID) if err != nil { @@ -375,11 +371,11 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou } ch := make(chan error, 1) - t.addBatch(d, treeID, m, ch) + t.addBatch(cnr, treeID, m, ch) return <-ch } -func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) { +func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) { t.mtx.Lock() for i := 0; i < len(t.batches); i++ { t.batches[i].mtx.Lock() @@ -391,7 +387,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e continue } - found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID + found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID if found { t.batches[i].results = append(t.batches[i].results, ch) t.batches[i].operations = append(t.batches[i].operations, m) @@ -412,7 +408,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e } b := &batch{ forest: t, - cid: d.CID, + cid: cnr, treeID: treeID, results: []chan<- error{ch}, operations: []*Move{m}, diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 743096c81..21209420a 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -5,6 +5,7 @@ import ( "strings" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" ) @@ -93,12 +94,8 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string } // TreeApply implements the Forest interface. -func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move, _ bool) error { - if !d.checkValid() { - return ErrInvalidCIDDescriptor - } - - fullID := d.CID.String() + "/" + treeID +func (f *memoryForest) TreeApply(cnr cid.ID, treeID string, op *Move, _ bool) error { + fullID := cnr.String() + "/" + treeID s, ok := f.treeMap[fullID] if !ok { s = newState() diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 9fe372b36..8e6f12717 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -411,21 +411,10 @@ func TestForest_Apply(t *testing.T) { func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) { cid := cidtest.ID() - d := CIDDescriptor{cid, 0, 1} treeID := "version" - t.Run("invalid descriptor", func(t *testing.T) { - s := constructor(t) - err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{ - Child: 10, - Parent: 0, - Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}}, - }, false) - require.ErrorIs(t, err, ErrInvalidCIDDescriptor) - }) - testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) { - require.NoError(t, s.TreeApply(d, treeID, &Move{ + require.NoError(t, s.TreeApply(cid, treeID, &Move{ Child: child, Parent: parent, Meta: meta, @@ -465,7 +454,6 @@ func TestForest_GetOpLog(t *testing.T) { func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) { cid := cidtest.ID() - d := CIDDescriptor{cid, 0, 1} treeID := "version" logs := []Move{ { @@ -491,7 +479,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Op }) for i := range logs { - require.NoError(t, s.TreeApply(d, treeID, &logs[i], false)) + require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false)) } testGetOpLog := func(t *testing.T, height uint64, m Move) { @@ -533,13 +521,12 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O cid := cidtest.ID() treeID := "version" - d := CIDDescriptor{cid, 0, 1} t.Run("empty state, no panic", func(t *testing.T) { checkExists(t, false, cid, treeID) }) - require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}, false)) + require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false)) checkExists(t, true, cid, treeID) checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree checkExists(t, false, cid, "another tree") // same CID, different tree @@ -570,16 +557,16 @@ func TestApplyTricky1(t *testing.T) { } treeID := "version" - d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1} + cid := cidtest.ID() for i := range providers { t.Run(providers[i].name, func(t *testing.T) { s := providers[i].construct(t) for i := range ops { - require.NoError(t, s.TreeApply(d, treeID, &ops[i], false)) + require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false)) } for i := range expected { - _, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child) + _, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child) require.NoError(t, err) require.Equal(t, expected[i].parent, parent) } @@ -631,16 +618,16 @@ func TestApplyTricky2(t *testing.T) { } treeID := "version" - d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1} + cid := cidtest.ID() for i := range providers { t.Run(providers[i].name, func(t *testing.T) { s := providers[i].construct(t) for i := range ops { - require.NoError(t, s.TreeApply(d, treeID, &ops[i], false)) + require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false)) } for i := range expected { - _, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child) + _, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child) require.NoError(t, err) require.Equal(t, expected[i].parent, parent) } @@ -746,12 +733,11 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ ops := prepareRandomTree(nodeCount, opCount) cid := cidtest.ID() - d := CIDDescriptor{cid, 0, 1} treeID := "version" expected := constructor(t) for i := range ops { - require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false)) + require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false)) } for i := 0; i < iterCount; i++ { @@ -766,7 +752,7 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ go func() { defer wg.Done() for op := range ch { - require.NoError(t, actual.TreeApply(d, treeID, op, false)) + require.NoError(t, actual.TreeApply(cid, treeID, op, false)) } }() } @@ -792,12 +778,11 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ .. ops := prepareRandomTree(nodeCount, opCount) cid := cidtest.ID() - d := CIDDescriptor{cid, 0, 1} treeID := "version" expected := constructor(t) for i := range ops { - require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false)) + require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false)) } const iterCount = 200 @@ -807,7 +792,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ .. actual := constructor(t) for i := range ops { - require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false)) + require.NoError(t, actual.TreeApply(cid, treeID, &ops[i], false)) } compareForests(t, expected, actual, cid, treeID, nodeCount) } @@ -889,7 +874,6 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { ops := genFunc(b.N) cid := cidtest.ID() - d := CIDDescriptor{cid, 0, 1} treeID := "version" ch := make(chan int, b.N) for i := 0; i < b.N; i++ { @@ -901,7 +885,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { b.SetParallelism(10) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil { + if err := s.TreeApply(cid, treeID, &ops[<-ch], false); err != nil { b.Fatalf("error in `Apply`: %v", err) } } @@ -918,7 +902,6 @@ func TestTreeGetByPath(t *testing.T) { func testTreeGetByPath(t *testing.T, s Forest) { cid := cidtest.ID() - d := CIDDescriptor{cid, 0, 1} treeID := "version" // / @@ -928,12 +911,12 @@ func testTreeGetByPath(t *testing.T, s Forest) { // |- cat1.jpg, Version=XXX (4) // |- cat1.jpg, Version=YYY (5) // |- cat2.jpg, Version=ZZZ (6) - testMove(t, s, 0, 1, 0, d, treeID, "a", "") - testMove(t, s, 1, 2, 0, d, treeID, "b", "") - testMove(t, s, 2, 3, 1, d, treeID, "cat1.jpg", "TTT") - testMove(t, s, 3, 4, 2, d, treeID, "cat1.jpg", "XXX") - testMove(t, s, 4, 5, 2, d, treeID, "cat1.jpg", "YYY") - testMove(t, s, 5, 6, 2, d, treeID, "cat2.jpg", "ZZZ") + testMove(t, s, 0, 1, 0, cid, treeID, "a", "") + testMove(t, s, 1, 2, 0, cid, treeID, "b", "") + testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT") + testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX") + testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY") + testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ") if mf, ok := s.(*memoryForest); ok { single := mf.treeMap[cid.String()+"/"+treeID] @@ -970,14 +953,14 @@ func testTreeGetByPath(t *testing.T, s Forest) { }) } -func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor, treeID, filename, version string) { +func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) { items := make([]KeyValue, 1, 2) items[0] = KeyValue{AttributeFilename, []byte(filename)} if version != "" { items = append(items, KeyValue{AttributeVersion, []byte(version)}) } - require.NoError(t, s.TreeApply(d, treeID, &Move{ + require.NoError(t, s.TreeApply(cid, treeID, &Move{ Parent: parent, Child: node, Meta: Meta{ diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index c2143de24..290f633a5 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -18,7 +18,7 @@ type Forest interface { TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) // TreeApply applies replicated operation from another node. // If background is true, TreeApply will first check whether an operation exists. - TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error + TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error // TreeGetByPath returns all nodes corresponding to the path. // The path is constructed by descending from the root using the values of the // AttributeFilename in meta. diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index acf038d71..5aa7fbd3d 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -155,7 +155,6 @@ func (s *Shard) Init() error { return nil } -// nolint: funlen func (s *Shard) refillMetabase() error { err := s.metaBase.Reset() if err != nil { @@ -172,57 +171,23 @@ func (s *Shard) refillMetabase() error { return nil } - // nolint: exhaustive + var err error switch obj.Type() { case objectSDK.TypeTombstone: - tombstone := objectSDK.NewTombstone() - - if err := tombstone.Unmarshal(obj.Payload()); err != nil { - return fmt.Errorf("could not unmarshal tombstone content: %w", err) - } - - tombAddr := object.AddressOf(obj) - memberIDs := tombstone.Members() - tombMembers := make([]oid.Address, 0, len(memberIDs)) - - for i := range memberIDs { - a := tombAddr - a.SetObject(memberIDs[i]) - - tombMembers = append(tombMembers, a) - } - - var inhumePrm meta.InhumePrm - - inhumePrm.SetTombstoneAddress(tombAddr) - inhumePrm.SetAddresses(tombMembers...) - - _, err = s.metaBase.Inhume(inhumePrm) - if err != nil { - return fmt.Errorf("could not inhume objects: %w", err) - } + err = s.refillTombstoneObject(obj) case objectSDK.TypeLock: - var lock objectSDK.Lock - if err := lock.Unmarshal(obj.Payload()); err != nil { - return fmt.Errorf("could not unmarshal lock content: %w", err) - } - - locked := make([]oid.ID, lock.NumberOfMembers()) - lock.ReadMembers(locked) - - cnr, _ := obj.ContainerID() - id, _ := obj.ID() - err = s.metaBase.Lock(cnr, id, locked) - if err != nil { - return fmt.Errorf("could not lock objects: %w", err) - } + err = s.refillLockObject(obj) + default: + } + if err != nil { + return err } var mPrm meta.PutPrm mPrm.SetObject(obj) mPrm.SetStorageID(descriptor) - _, err := s.metaBase.Put(mPrm) + _, err = s.metaBase.Put(mPrm) if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { return err } @@ -241,6 +206,54 @@ func (s *Shard) refillMetabase() error { return nil } +func (s *Shard) refillLockObject(obj *objectSDK.Object) error { + var lock objectSDK.Lock + if err := lock.Unmarshal(obj.Payload()); err != nil { + return fmt.Errorf("could not unmarshal lock content: %w", err) + } + + locked := make([]oid.ID, lock.NumberOfMembers()) + lock.ReadMembers(locked) + + cnr, _ := obj.ContainerID() + id, _ := obj.ID() + err := s.metaBase.Lock(cnr, id, locked) + if err != nil { + return fmt.Errorf("could not lock objects: %w", err) + } + return nil +} + +func (s *Shard) refillTombstoneObject(obj *objectSDK.Object) error { + tombstone := objectSDK.NewTombstone() + + if err := tombstone.Unmarshal(obj.Payload()); err != nil { + return fmt.Errorf("could not unmarshal tombstone content: %w", err) + } + + tombAddr := object.AddressOf(obj) + memberIDs := tombstone.Members() + tombMembers := make([]oid.Address, 0, len(memberIDs)) + + for i := range memberIDs { + a := tombAddr + a.SetObject(memberIDs[i]) + + tombMembers = append(tombMembers, a) + } + + var inhumePrm meta.InhumePrm + + inhumePrm.SetTombstoneAddress(tombAddr) + inhumePrm.SetAddresses(tombMembers...) + + _, err := s.metaBase.Inhume(inhumePrm) + if err != nil { + return fmt.Errorf("could not inhume objects: %w", err) + } + return nil +} + // Close releases all Shard's components. func (s *Shard) Close() error { components := []interface{ Close() error }{} diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index b9f909997..684c92e66 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -42,7 +42,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri } // TreeApply implements the pilorama.Forest interface. -func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error { +func (s *Shard) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error { if s.pilorama == nil { return ErrPiloramaDisabled } @@ -53,7 +53,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M if s.info.Mode.ReadOnly() { return ErrReadOnlyMode } - return s.pilorama.TreeApply(d, treeID, m, backgroundSync) + return s.pilorama.TreeApply(cnr, treeID, m, backgroundSync) } // TreeGetByPath implements the pilorama.Forest interface. diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index e35cece36..a871714a1 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -426,7 +426,7 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object) return nil } -// nolint: funlen, gocognit +// nolint: funlen func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { body := req.GetBody() @@ -459,138 +459,140 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp body: resp.GetBody(), }) - if !commonPrm.LocalOnly() { - var onceResign sync.Once - - p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { - var err error - - key, err := s.keyStorage.GetKey(nil) - if err != nil { - return nil, err - } - - // once compose and resign forwarding request - onceResign.Do(func() { - // compose meta header of the local server - metaHdr := new(session.RequestMetaHeader) - metaHdr.SetTTL(meta.GetTTL() - 1) - // TODO: #1165 think how to set the other fields - metaHdr.SetOrigin(meta) - writeCurrentVersion(metaHdr) - - req.SetMetaHeader(metaHdr) - - err = signature.SignServiceMessage(key, req) - }) - - if err != nil { - return nil, err - } - - // code below is copy-pasted from c.GetObjectHeader implementation, - // perhaps it is worth highlighting the utility function in frostfs-api-go - - // send Head request - var headResp *objectV2.HeadResponse - err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { - headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx)) - return err - }) - if err != nil { - return nil, fmt.Errorf("sending the request failed: %w", err) - } - - // verify response key - if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil { - return nil, err - } - - // verify response structure - if err := signature.VerifyServiceMessage(headResp); err != nil { - return nil, fmt.Errorf("response verification failed: %w", err) - } - - if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return nil, err - } - - var ( - hdr *objectV2.Header - idSig *refs.Signature - ) - - switch v := headResp.GetBody().GetHeaderPart().(type) { - case nil: - return nil, fmt.Errorf("unexpected header type %T", v) - case *objectV2.ShortHeader: - if !body.GetMainOnly() { - return nil, fmt.Errorf("wrong header part type: expected %T, received %T", - (*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil), - ) - } - - h := v - - hdr = new(objectV2.Header) - hdr.SetPayloadLength(h.GetPayloadLength()) - hdr.SetVersion(h.GetVersion()) - hdr.SetOwnerID(h.GetOwnerID()) - hdr.SetObjectType(h.GetObjectType()) - hdr.SetCreationEpoch(h.GetCreationEpoch()) - hdr.SetPayloadHash(h.GetPayloadHash()) - hdr.SetHomomorphicHash(h.GetHomomorphicHash()) - case *objectV2.HeaderWithSignature: - if body.GetMainOnly() { - return nil, fmt.Errorf("wrong header part type: expected %T, received %T", - (*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil), - ) - } - - hdrWithSig := v - if hdrWithSig == nil { - return nil, errors.New("nil object part") - } - - hdr = hdrWithSig.GetHeader() - idSig = hdrWithSig.GetSignature() - - if idSig == nil { - // TODO(@cthulhu-rider): #1387 use "const" error - return nil, errors.New("missing signature") - } - - binID, err := objAddr.Object().Marshal() - if err != nil { - return nil, fmt.Errorf("marshal ID: %w", err) - } - - var sig frostfscrypto.Signature - if err := sig.ReadFromV2(*idSig); err != nil { - return nil, fmt.Errorf("can't read signature: %w", err) - } - - if !sig.Verify(binID) { - return nil, errors.New("invalid object ID signature") - } - case *objectV2.SplitInfo: - si := object.NewSplitInfoFromV2(v) - - return nil, object.NewSplitInfoError(si) - } - - objv2 := new(objectV2.Object) - objv2.SetHeader(hdr) - objv2.SetSignature(idSig) - - obj := object.NewFromV2(objv2) - obj.SetID(objAddr.Object()) - - // convert the object - return obj, nil - })) + if commonPrm.LocalOnly() { + return p, nil } + var onceResign sync.Once + + p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { + var err error + + key, err := s.keyStorage.GetKey(nil) + if err != nil { + return nil, err + } + + // once compose and resign forwarding request + onceResign.Do(func() { + // compose meta header of the local server + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(meta.GetTTL() - 1) + // TODO: #1165 think how to set the other fields + metaHdr.SetOrigin(meta) + writeCurrentVersion(metaHdr) + + req.SetMetaHeader(metaHdr) + + err = signature.SignServiceMessage(key, req) + }) + + if err != nil { + return nil, err + } + + // code below is copy-pasted from c.GetObjectHeader implementation, + // perhaps it is worth highlighting the utility function in frostfs-api-go + + // send Head request + var headResp *objectV2.HeadResponse + err = c.RawForAddress(addr, func(cli *rpcclient.Client) error { + headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx)) + return err + }) + if err != nil { + return nil, fmt.Errorf("sending the request failed: %w", err) + } + + // verify response key + if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil { + return nil, err + } + + // verify response structure + if err := signature.VerifyServiceMessage(headResp); err != nil { + return nil, fmt.Errorf("response verification failed: %w", err) + } + + if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { + return nil, err + } + + var ( + hdr *objectV2.Header + idSig *refs.Signature + ) + + switch v := headResp.GetBody().GetHeaderPart().(type) { + case nil: + return nil, fmt.Errorf("unexpected header type %T", v) + case *objectV2.ShortHeader: + if !body.GetMainOnly() { + return nil, fmt.Errorf("wrong header part type: expected %T, received %T", + (*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil), + ) + } + + h := v + + hdr = new(objectV2.Header) + hdr.SetPayloadLength(h.GetPayloadLength()) + hdr.SetVersion(h.GetVersion()) + hdr.SetOwnerID(h.GetOwnerID()) + hdr.SetObjectType(h.GetObjectType()) + hdr.SetCreationEpoch(h.GetCreationEpoch()) + hdr.SetPayloadHash(h.GetPayloadHash()) + hdr.SetHomomorphicHash(h.GetHomomorphicHash()) + case *objectV2.HeaderWithSignature: + if body.GetMainOnly() { + return nil, fmt.Errorf("wrong header part type: expected %T, received %T", + (*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil), + ) + } + + hdrWithSig := v + if hdrWithSig == nil { + return nil, errors.New("nil object part") + } + + hdr = hdrWithSig.GetHeader() + idSig = hdrWithSig.GetSignature() + + if idSig == nil { + // TODO(@cthulhu-rider): #1387 use "const" error + return nil, errors.New("missing signature") + } + + binID, err := objAddr.Object().Marshal() + if err != nil { + return nil, fmt.Errorf("marshal ID: %w", err) + } + + var sig frostfscrypto.Signature + if err := sig.ReadFromV2(*idSig); err != nil { + return nil, fmt.Errorf("can't read signature: %w", err) + } + + if !sig.Verify(binID) { + return nil, errors.New("invalid object ID signature") + } + case *objectV2.SplitInfo: + si := object.NewSplitInfoFromV2(v) + + return nil, object.NewSplitInfoError(si) + } + + objv2 := new(objectV2.Object) + objv2.SetHeader(hdr) + objv2.SetSignature(idSig) + + obj := object.NewFromV2(objv2) + obj.SetID(objAddr.Object()) + + // convert the object + return obj, nil + })) + return p, nil } diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 01bd2debd..bb20310b2 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -27,7 +27,7 @@ type replicationTask struct { type applyOp struct { treeID string - pilorama.CIDDescriptor + cid cidSDK.ID pilorama.Move } @@ -43,7 +43,7 @@ func (s *Service) localReplicationWorker() { case <-s.closeCh: return case op := <-s.replicateLocalCh: - err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false) + err := s.forest.TreeApply(op.cid, op.treeID, &op.Move, false) if err != nil { s.log.Error("failed to apply replicated operation", zap.String("err", err.Error())) @@ -52,7 +52,7 @@ func (s *Service) localReplicationWorker() { } } -func (s *Service) replicationWorker() { +func (s *Service) replicationWorker(ctx context.Context) { for { select { case <-s.closeCh: @@ -64,13 +64,13 @@ func (s *Service) replicationWorker() { task.n.IterateNetworkEndpoints(func(addr string) bool { lastAddr = addr - c, err := s.cache.get(context.Background(), addr) + c, err := s.cache.get(ctx, addr) if err != nil { lastErr = fmt.Errorf("can't create client: %w", err) return false } - ctx, cancel := context.WithTimeout(context.Background(), s.replicatorTimeout) + ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout) _, lastErr = c.Apply(ctx, task.req) cancel() @@ -94,8 +94,7 @@ func (s *Service) replicationWorker() { func (s *Service) replicateLoop(ctx context.Context) { for i := 0; i < s.replicatorWorkerCount; i++ { - //nolint: contextcheck - go s.replicationWorker() + go s.replicationWorker(ctx) go s.localReplicationWorker() } defer func() { diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 3176858e2..edea450f1 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -468,7 +468,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e key := req.GetSignature().GetKey() - _, pos, size, err := s.getContainerInfo(cid, key) + _, pos, _, err := s.getContainerInfo(cid, key) if err != nil { return nil, err } @@ -485,8 +485,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e select { case s.replicateLocalCh <- applyOp{ - treeID: req.GetBody().GetTreeId(), - CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}, + treeID: req.GetBody().GetTreeId(), + cid: cid, Move: pilorama.Move{ Parent: op.GetParentId(), Child: op.GetChildId(), diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index abd683b77..d4ef7df5d 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -14,6 +14,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/panjf2000/ants/v2" "go.uber.org/zap" @@ -40,11 +41,6 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { return ErrNotInContainer } - var d pilorama.CIDDescriptor - d.CID = cid - d.Position = pos - d.Size = len(nodes) - nodes = randomizeNodeOrder(nodes, pos) if len(nodes) == 0 { return nil @@ -87,18 +83,18 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error { } for _, tid := range treesToSync { - h, err := s.forest.TreeLastSyncHeight(d.CID, tid) + h, err := s.forest.TreeLastSyncHeight(cid, tid) if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { s.log.Warn("could not get last synchronized height for a tree", - zap.Stringer("cid", d.CID), + zap.Stringer("cid", cid), zap.String("tree", tid)) continue } - newHeight := s.synchronizeTree(ctx, d, h, tid, nodes) + newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes) if h < newHeight { - if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil { + if err := s.forest.TreeUpdateLastSyncHeight(cid, tid, newHeight); err != nil { s.log.Warn("could not update last synchronized height for a tree", - zap.Stringer("cid", d.CID), + zap.Stringer("cid", cid), zap.String("tree", tid)) } } @@ -118,24 +114,19 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string return ErrNotInContainer } - var d pilorama.CIDDescriptor - d.CID = cid - d.Position = pos - d.Size = len(nodes) - nodes = randomizeNodeOrder(nodes, pos) if len(nodes) == 0 { return nil } - s.synchronizeTree(ctx, d, 0, treeID, nodes) + s.synchronizeTree(ctx, cid, 0, treeID, nodes) return nil } -func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, from uint64, +func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint64, treeID string, nodes []netmapSDK.NodeInfo) uint64 { s.log.Debug("synchronize tree", - zap.Stringer("cid", d.CID), + zap.Stringer("cid", cid), zap.String("tree", treeID), zap.Uint64("from", from)) @@ -157,7 +148,7 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, treeClient := NewTreeServiceClient(cc) for { - h, err := s.synchronizeSingle(ctx, d, treeID, height, treeClient) + h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient) if height < h { height = h } @@ -179,9 +170,9 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, return newHeight } -func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) { +func (s *Service) synchronizeSingle(ctx context.Context, cid cidSDK.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) { rawCID := make([]byte, sha256.Size) - d.CID.Encode(rawCID) + cid.Encode(rawCID) for { newHeight := height @@ -211,7 +202,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto if err := m.Meta.FromBytes(lm.Meta); err != nil { return newHeight, err } - if err := s.forest.TreeApply(d, treeID, m, true); err != nil { + if err := s.forest.TreeApply(cid, treeID, m, true); err != nil { return newHeight, err } if m.Time > newHeight { @@ -255,7 +246,6 @@ func (s *Service) SynchronizeAll() error { } } -// nolint: funlen, gocognit func (s *Service) syncLoop(ctx context.Context) { for { select { @@ -272,86 +262,99 @@ func (s *Service) syncLoop(ctx context.Context) { continue } - newMap := make(map[cid.ID]struct{}, len(s.cnrMap)) - cnrsToSync := make([]cid.ID, 0, len(cnrs)) + newMap, cnrsToSync := s.containersToSync(cnrs) - var removed []cid.ID - for _, cnr := range cnrs { - _, pos, err := s.getContainerNodes(cnr) - if err != nil { - s.log.Error("could not calculate container nodes", - zap.Stringer("cid", cnr), - zap.Error(err)) - continue - } + s.syncContainers(ctx, cnrsToSync) - if pos < 0 { - // node is not included in the container. - continue - } - - newMap[cnr] = struct{}{} - cnrsToSync = append(cnrsToSync, cnr) - } - - // sync new containers - var wg sync.WaitGroup - for _, cnr := range cnrsToSync { - wg.Add(1) - cnr := cnr - err := s.syncPool.Submit(func() { - defer wg.Done() - s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr)) - - err := s.synchronizeAllTrees(ctx, cnr) - if err != nil { - s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err)) - return - } - - s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr)) - }) - if err != nil { - wg.Done() - s.log.Error("could not query trees for synchronization", - zap.Stringer("cid", cnr), - zap.Error(err)) - if errors.Is(err, ants.ErrPoolClosed) { - return - } - } - } - wg.Wait() - - s.cnrMapMtx.Lock() - for cnr := range s.cnrMap { - if _, ok := newMap[cnr]; ok { - continue - } - removed = append(removed, cnr) - } - for i := range removed { - delete(s.cnrMap, removed[i]) - } - s.cnrMapMtx.Unlock() - - for _, cnr := range removed { - s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr)) - - err = s.DropTree(ctx, cnr, "") - if err != nil { - s.log.Error("could not remove redundant tree", - zap.Stringer("cid", cnr), - zap.Error(err)) - continue - } - } + s.removeContainers(ctx, newMap) s.log.Debug("trees have been synchronized") } } } +func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) { + // sync new containers + var wg sync.WaitGroup + for _, cnr := range cnrs { + wg.Add(1) + cnr := cnr + err := s.syncPool.Submit(func() { + defer wg.Done() + s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr)) + + err := s.synchronizeAllTrees(ctx, cnr) + if err != nil { + s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err)) + return + } + + s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr)) + }) + if err != nil { + wg.Done() + s.log.Error("could not query trees for synchronization", + zap.Stringer("cid", cnr), + zap.Error(err)) + if errors.Is(err, ants.ErrPoolClosed) { + return + } + } + } + wg.Wait() +} + +func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) { + s.cnrMapMtx.Lock() + defer s.cnrMapMtx.Unlock() + + var removed []cid.ID + for cnr := range s.cnrMap { + if _, ok := newContainers[cnr]; ok { + continue + } + removed = append(removed, cnr) + } + for i := range removed { + delete(s.cnrMap, removed[i]) + } + + for _, cnr := range removed { + s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr)) + + err := s.DropTree(ctx, cnr, "") + if err != nil { + s.log.Error("could not remove redundant tree", + zap.Stringer("cid", cnr), + zap.Error(err)) + } + } +} + +func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) { + newMap := make(map[cid.ID]struct{}, len(s.cnrMap)) + cnrsToSync := make([]cid.ID, 0, len(cnrs)) + + for _, cnr := range cnrs { + _, pos, err := s.getContainerNodes(cnr) + if err != nil { + s.log.Error("could not calculate container nodes", + zap.Stringer("cid", cnr), + zap.Error(err)) + continue + } + + if pos < 0 { + // node is not included in the container. + continue + } + + newMap[cnr] = struct{}{} + cnrsToSync = append(cnrsToSync, cnr) + } + return newMap, cnrsToSync +} + // randomizeNodeOrder shuffles nodes and removes not a `pos` index. // It is assumed that 0 <= pos < len(nodes). func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {