Fix some of the linter exceptions #156

Merged
fyrchik merged 7 commits from fyrchik/frostfs-node:linter-fixes into master 2023-03-22 07:14:19 +00:00
13 changed files with 460 additions and 444 deletions

View file

@ -44,7 +44,6 @@ const (
notaryEnabled = true notaryEnabled = true
) )
// nolint: funlen, gocognit
func dumpBalances(cmd *cobra.Command, _ []string) error { func dumpBalances(cmd *cobra.Command, _ []string) error {
var ( var (
dumpStorage, _ = cmd.Flags().GetBool(dumpBalancesStorageFlag) dumpStorage, _ = cmd.Flags().GetBool(dumpBalancesStorageFlag)
@ -84,86 +83,110 @@ func dumpBalances(cmd *cobra.Command, _ []string) error {
printBalances(cmd, "Inner ring nodes balances:", irList) printBalances(cmd, "Inner ring nodes balances:", irList)
if dumpStorage { if dumpStorage {
arr, err := unwrap.Array(inv.Call(nmHash, "netmap")) if err := printStorageNodeBalances(cmd, inv, nmHash); err != nil {
if err != nil {
return errors.New("can't fetch the list of storage nodes")
}
snList := make([]accBalancePair, len(arr))
for i := range arr {
node, ok := arr[i].Value().([]stackitem.Item)
if !ok || len(node) == 0 {
return errors.New("can't parse the list of storage nodes")
}
bs, err := node[0].TryBytes()
if err != nil {
return errors.New("can't parse the list of storage nodes")
}
var ni netmap.NodeInfo
if err := ni.Unmarshal(bs); err != nil {
return fmt.Errorf("can't parse the list of storage nodes: %w", err)
}
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
if err != nil {
return fmt.Errorf("can't parse storage node public key: %w", err)
}
snList[i].scriptHash = pub.GetScriptHash()
}
if err := fetchBalances(inv, gas.Hash, snList); err != nil {
return err return err
} }
printBalances(cmd, "\nStorage node balances:", snList)
} }
if dumpProxy { if dumpProxy {
h, err := nnsResolveHash(inv, nnsCs.Hash, proxyContract+".frostfs") if err := printProxyContractBalance(cmd, inv, nnsCs.Hash); err != nil {
if err != nil {
return fmt.Errorf("can't get hash of the proxy contract: %w", err)
}
proxyList := []accBalancePair{{scriptHash: h}}
if err := fetchBalances(inv, gas.Hash, proxyList); err != nil {
return err return err
} }
printBalances(cmd, "\nProxy contract balance:", proxyList)
} }
if dumpAlphabet { if dumpAlphabet {
alphaList := make([]accBalancePair, len(irList)) if err := printAlphabetContractBalances(cmd, c, inv, len(irList), nnsCs.Hash); err != nil {
w := io.NewBufBinWriter()
for i := range alphaList {
emit.AppCall(w.BinWriter, nnsCs.Hash, "resolve", callflag.ReadOnly,
getAlphabetNNSDomain(i),
int64(nns.TXT))
}
if w.Err != nil {
panic(w.Err)
}
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
if err != nil {
return fmt.Errorf("can't fetch info from NNS: %w", err)
}
for i := range alphaList {
h, err := parseNNSResolveResult(alphaRes.Stack[i])
if err != nil {
return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err)
}
alphaList[i].scriptHash = h
}
if err := fetchBalances(inv, gas.Hash, alphaList); err != nil {
return err return err
} }
printBalances(cmd, "\nAlphabet contracts balances:", alphaList)
} }
return nil return nil
} }
func printStorageNodeBalances(cmd *cobra.Command, inv *invoker.Invoker, nmHash util.Uint160) error {
arr, err := unwrap.Array(inv.Call(nmHash, "netmap"))
if err != nil {
return errors.New("can't fetch the list of storage nodes")
}
snList := make([]accBalancePair, len(arr))
for i := range arr {
node, ok := arr[i].Value().([]stackitem.Item)
if !ok || len(node) == 0 {
return errors.New("can't parse the list of storage nodes")
}
bs, err := node[0].TryBytes()
if err != nil {
return errors.New("can't parse the list of storage nodes")
}
var ni netmap.NodeInfo
if err := ni.Unmarshal(bs); err != nil {
return fmt.Errorf("can't parse the list of storage nodes: %w", err)
}
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
if err != nil {
return fmt.Errorf("can't parse storage node public key: %w", err)
}
snList[i].scriptHash = pub.GetScriptHash()
}
if err := fetchBalances(inv, gas.Hash, snList); err != nil {
return err
}
printBalances(cmd, "\nStorage node balances:", snList)
return nil
}
func printProxyContractBalance(cmd *cobra.Command, inv *invoker.Invoker, nnsHash util.Uint160) error {
h, err := nnsResolveHash(inv, nnsHash, proxyContract+".frostfs")
if err != nil {
return fmt.Errorf("can't get hash of the proxy contract: %w", err)
}
proxyList := []accBalancePair{{scriptHash: h}}
if err := fetchBalances(inv, gas.Hash, proxyList); err != nil {
return err
}
printBalances(cmd, "\nProxy contract balance:", proxyList)
return nil
}
func printAlphabetContractBalances(cmd *cobra.Command, c Client, inv *invoker.Invoker, count int, nnsHash util.Uint160) error {
alphaList := make([]accBalancePair, count)
w := io.NewBufBinWriter()
for i := range alphaList {
emit.AppCall(w.BinWriter, nnsHash, "resolve", callflag.ReadOnly,
getAlphabetNNSDomain(i),
int64(nns.TXT))
}
if w.Err != nil {
panic(w.Err)
}
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
if err != nil {
return fmt.Errorf("can't fetch info from NNS: %w", err)
}
for i := range alphaList {
h, err := parseNNSResolveResult(alphaRes.Stack[i])
if err != nil {
return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err)
}
alphaList[i].scriptHash = h
}
if err := fetchBalances(inv, gas.Hash, alphaList); err != nil {
return err
}
printBalances(cmd, "\nAlphabet contracts balances:", alphaList)
return nil
}
func fetchIRNodes(c Client, nmHash, desigHash util.Uint160) ([]accBalancePair, error) { func fetchIRNodes(c Client, nmHash, desigHash util.Uint160) ([]accBalancePair, error) {
var irList []accBalancePair var irList []accBalancePair

View file

@ -51,17 +51,17 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a
} }
// TreeApply implements the pilorama.Forest interface. // TreeApply implements the pilorama.Forest interface.
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error { func (e *StorageEngine) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
index, lst, err := e.getTreeShard(d.CID, treeID) index, lst, err := e.getTreeShard(cnr, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err return err
} }
err = lst[index].TreeApply(d, treeID, m, backgroundSync) err = lst[index].TreeApply(cnr, treeID, m, backgroundSync)
if err != nil { if err != nil {
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled { if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeApply`", err, e.reportShardError(lst[index], "can't perform `TreeApply`", err,
zap.Stringer("cid", d.CID), zap.Stringer("cid", cnr),
zap.String("tree", treeID)) zap.String("tree", treeID))
} }
return err return err

View file

@ -138,57 +138,57 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
// Operation is very resource-intensive, which is caused by the admissibility // Operation is very resource-intensive, which is caused by the admissibility
// of multiple locks. Also, if we knew what objects are locked, it would be // of multiple locks. Also, if we knew what objects are locked, it would be
// possible to speed up the execution. // possible to speed up the execution.
//
// nolint: gocognit
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error { func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
bucketLocked := tx.Bucket(bucketNameLocked) bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked != nil { if bucketLocked == nil {
key := make([]byte, cidSize) return nil
idCnr.Encode(key) }
bucketLockedContainer := bucketLocked.Bucket(key) key := make([]byte, cidSize)
if bucketLockedContainer != nil { idCnr.Encode(key)
keyLocker := objectKey(locker, key)
return bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
}
for i := range keyLockers { bucketLockedContainer := bucketLocked.Bucket(key)
if bytes.Equal(keyLockers[i], keyLocker) { if bucketLockedContainer == nil {
if len(keyLockers) == 1 { return nil
// locker was all alone }
err = bucketLockedContainer.Delete(k)
if err != nil {
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
v, err = encodeList(keyLockers) keyLocker := objectKey(locker, key)
if err != nil { return bucketLockedContainer.ForEach(func(k, v []byte) error {
return fmt.Errorf("encode updated list of lockers: %w", err) keyLockers, err := decodeList(v)
} if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
}
// update the record for i := range keyLockers {
err = bucketLockedContainer.Put(k, v) if bytes.Equal(keyLockers[i], keyLocker) {
if err != nil { if len(keyLockers) == 1 {
return fmt.Errorf("update list of lockers: %w", err) // locker was all alone
} err = bucketLockedContainer.Delete(k)
} if err != nil {
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
return nil v, err = encodeList(keyLockers)
if err != nil {
return fmt.Errorf("encode updated list of lockers: %w", err)
}
// update the record
err = bucketLockedContainer.Put(k, v)
if err != nil {
return fmt.Errorf("update list of lockers: %w", err)
} }
} }
return nil return nil
}) }
} }
}
return nil return nil
})
} }
// IsLockedPrm groups the parameters of IsLocked operation. // IsLockedPrm groups the parameters of IsLocked operation.

View file

@ -327,11 +327,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
} }
// TreeApply implements the Forest interface. // TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error { func (t *boltForest) TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error {
if !d.checkValid() {
return ErrInvalidCIDDescriptor
}
t.modeMtx.RLock() t.modeMtx.RLock()
defer t.modeMtx.RUnlock() defer t.modeMtx.RUnlock()
@ -344,7 +340,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
if backgroundSync { if backgroundSync {
var seen bool var seen bool
err := t.db.View(func(tx *bbolt.Tx) error { err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(d.CID, treeID)) treeRoot := tx.Bucket(bucketName(cnr, treeID))
if treeRoot == nil { if treeRoot == nil {
return nil return nil
} }
@ -362,7 +358,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
} }
if t.db.MaxBatchSize == 1 { if t.db.MaxBatchSize == 1 {
fullID := bucketName(d.CID, treeID) fullID := bucketName(cnr, treeID)
return t.db.Update(func(tx *bbolt.Tx) error { return t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID) bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil { if err != nil {
@ -375,11 +371,11 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
} }
ch := make(chan error, 1) ch := make(chan error, 1)
t.addBatch(d, treeID, m, ch) t.addBatch(cnr, treeID, m, ch)
return <-ch return <-ch
} }
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) { func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
t.mtx.Lock() t.mtx.Lock()
for i := 0; i < len(t.batches); i++ { for i := 0; i < len(t.batches); i++ {
t.batches[i].mtx.Lock() t.batches[i].mtx.Lock()
@ -391,7 +387,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
continue continue
} }
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID
if found { if found {
t.batches[i].results = append(t.batches[i].results, ch) t.batches[i].results = append(t.batches[i].results, ch)
t.batches[i].operations = append(t.batches[i].operations, m) t.batches[i].operations = append(t.batches[i].operations, m)
@ -412,7 +408,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
} }
b := &batch{ b := &batch{
forest: t, forest: t,
cid: d.CID, cid: cnr,
treeID: treeID, treeID: treeID,
results: []chan<- error{ch}, results: []chan<- error{ch},
operations: []*Move{m}, operations: []*Move{m},

View file

@ -5,6 +5,7 @@ import (
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
) )
@ -93,12 +94,8 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
} }
// TreeApply implements the Forest interface. // TreeApply implements the Forest interface.
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move, _ bool) error { func (f *memoryForest) TreeApply(cnr cid.ID, treeID string, op *Move, _ bool) error {
if !d.checkValid() { fullID := cnr.String() + "/" + treeID
return ErrInvalidCIDDescriptor
}
fullID := d.CID.String() + "/" + treeID
s, ok := f.treeMap[fullID] s, ok := f.treeMap[fullID]
if !ok { if !ok {
s = newState() s = newState()

View file

@ -411,21 +411,10 @@ func TestForest_Apply(t *testing.T) {
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) { func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
t.Run("invalid descriptor", func(t *testing.T) {
s := constructor(t)
err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{
Child: 10,
Parent: 0,
Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}},
}, false)
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
})
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) { testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
require.NoError(t, s.TreeApply(d, treeID, &Move{ require.NoError(t, s.TreeApply(cid, treeID, &Move{
Child: child, Child: child,
Parent: parent, Parent: parent,
Meta: meta, Meta: meta,
@ -465,7 +454,6 @@ func TestForest_GetOpLog(t *testing.T) {
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) { func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
logs := []Move{ logs := []Move{
{ {
@ -491,7 +479,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Op
}) })
for i := range logs { for i := range logs {
require.NoError(t, s.TreeApply(d, treeID, &logs[i], false)) require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false))
} }
testGetOpLog := func(t *testing.T, height uint64, m Move) { testGetOpLog := func(t *testing.T, height uint64, m Move) {
@ -533,13 +521,12 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
cid := cidtest.ID() cid := cidtest.ID()
treeID := "version" treeID := "version"
d := CIDDescriptor{cid, 0, 1}
t.Run("empty state, no panic", func(t *testing.T) { t.Run("empty state, no panic", func(t *testing.T) {
checkExists(t, false, cid, treeID) checkExists(t, false, cid, treeID)
}) })
require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}, false)) require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID) checkExists(t, true, cid, treeID)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
checkExists(t, false, cid, "another tree") // same CID, different tree checkExists(t, false, cid, "another tree") // same CID, different tree
@ -570,16 +557,16 @@ func TestApplyTricky1(t *testing.T) {
} }
treeID := "version" treeID := "version"
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1} cid := cidtest.ID()
for i := range providers { for i := range providers {
t.Run(providers[i].name, func(t *testing.T) { t.Run(providers[i].name, func(t *testing.T) {
s := providers[i].construct(t) s := providers[i].construct(t)
for i := range ops { for i := range ops {
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false)) require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
} }
for i := range expected { for i := range expected {
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child) _, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expected[i].parent, parent) require.Equal(t, expected[i].parent, parent)
} }
@ -631,16 +618,16 @@ func TestApplyTricky2(t *testing.T) {
} }
treeID := "version" treeID := "version"
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1} cid := cidtest.ID()
for i := range providers { for i := range providers {
t.Run(providers[i].name, func(t *testing.T) { t.Run(providers[i].name, func(t *testing.T) {
s := providers[i].construct(t) s := providers[i].construct(t)
for i := range ops { for i := range ops {
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false)) require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
} }
for i := range expected { for i := range expected {
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child) _, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expected[i].parent, parent) require.Equal(t, expected[i].parent, parent)
} }
@ -746,12 +733,11 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
ops := prepareRandomTree(nodeCount, opCount) ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
expected := constructor(t) expected := constructor(t)
for i := range ops { for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false)) require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
} }
for i := 0; i < iterCount; i++ { for i := 0; i < iterCount; i++ {
@ -766,7 +752,7 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
go func() { go func() {
defer wg.Done() defer wg.Done()
for op := range ch { for op := range ch {
require.NoError(t, actual.TreeApply(d, treeID, op, false)) require.NoError(t, actual.TreeApply(cid, treeID, op, false))
} }
}() }()
} }
@ -792,12 +778,11 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
ops := prepareRandomTree(nodeCount, opCount) ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
expected := constructor(t) expected := constructor(t)
for i := range ops { for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false)) require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
} }
const iterCount = 200 const iterCount = 200
@ -807,7 +792,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
actual := constructor(t) actual := constructor(t)
for i := range ops { for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false)) require.NoError(t, actual.TreeApply(cid, treeID, &ops[i], false))
} }
compareForests(t, expected, actual, cid, treeID, nodeCount) compareForests(t, expected, actual, cid, treeID, nodeCount)
} }
@ -889,7 +874,6 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
ops := genFunc(b.N) ops := genFunc(b.N)
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
ch := make(chan int, b.N) ch := make(chan int, b.N)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
@ -901,7 +885,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
b.SetParallelism(10) b.SetParallelism(10)
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil { if err := s.TreeApply(cid, treeID, &ops[<-ch], false); err != nil {
b.Fatalf("error in `Apply`: %v", err) b.Fatalf("error in `Apply`: %v", err)
} }
} }
@ -918,7 +902,6 @@ func TestTreeGetByPath(t *testing.T) {
func testTreeGetByPath(t *testing.T, s Forest) { func testTreeGetByPath(t *testing.T, s Forest) {
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
// / // /
@ -928,12 +911,12 @@ func testTreeGetByPath(t *testing.T, s Forest) {
// |- cat1.jpg, Version=XXX (4) // |- cat1.jpg, Version=XXX (4)
// |- cat1.jpg, Version=YYY (5) // |- cat1.jpg, Version=YYY (5)
// |- cat2.jpg, Version=ZZZ (6) // |- cat2.jpg, Version=ZZZ (6)
testMove(t, s, 0, 1, 0, d, treeID, "a", "") testMove(t, s, 0, 1, 0, cid, treeID, "a", "")
testMove(t, s, 1, 2, 0, d, treeID, "b", "") testMove(t, s, 1, 2, 0, cid, treeID, "b", "")
testMove(t, s, 2, 3, 1, d, treeID, "cat1.jpg", "TTT") testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT")
testMove(t, s, 3, 4, 2, d, treeID, "cat1.jpg", "XXX") testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX")
testMove(t, s, 4, 5, 2, d, treeID, "cat1.jpg", "YYY") testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY")
testMove(t, s, 5, 6, 2, d, treeID, "cat2.jpg", "ZZZ") testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ")
if mf, ok := s.(*memoryForest); ok { if mf, ok := s.(*memoryForest); ok {
single := mf.treeMap[cid.String()+"/"+treeID] single := mf.treeMap[cid.String()+"/"+treeID]
@ -970,14 +953,14 @@ func testTreeGetByPath(t *testing.T, s Forest) {
}) })
} }
func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor, treeID, filename, version string) { func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) {
items := make([]KeyValue, 1, 2) items := make([]KeyValue, 1, 2)
items[0] = KeyValue{AttributeFilename, []byte(filename)} items[0] = KeyValue{AttributeFilename, []byte(filename)}
if version != "" { if version != "" {
items = append(items, KeyValue{AttributeVersion, []byte(version)}) items = append(items, KeyValue{AttributeVersion, []byte(version)})
} }
require.NoError(t, s.TreeApply(d, treeID, &Move{ require.NoError(t, s.TreeApply(cid, treeID, &Move{
Parent: parent, Parent: parent,
Child: node, Child: node,
Meta: Meta{ Meta: Meta{

View file

@ -18,7 +18,7 @@ type Forest interface {
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
// TreeApply applies replicated operation from another node. // TreeApply applies replicated operation from another node.
// If background is true, TreeApply will first check whether an operation exists. // If background is true, TreeApply will first check whether an operation exists.
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
// TreeGetByPath returns all nodes corresponding to the path. // TreeGetByPath returns all nodes corresponding to the path.
// The path is constructed by descending from the root using the values of the // The path is constructed by descending from the root using the values of the
// AttributeFilename in meta. // AttributeFilename in meta.

View file

@ -155,7 +155,6 @@ func (s *Shard) Init() error {
return nil return nil
} }
// nolint: funlen
func (s *Shard) refillMetabase() error { func (s *Shard) refillMetabase() error {
err := s.metaBase.Reset() err := s.metaBase.Reset()
if err != nil { if err != nil {
@ -172,57 +171,23 @@ func (s *Shard) refillMetabase() error {
return nil return nil
} }
// nolint: exhaustive var err error
switch obj.Type() { switch obj.Type() {
case objectSDK.TypeTombstone: case objectSDK.TypeTombstone:
tombstone := objectSDK.NewTombstone() err = s.refillTombstoneObject(obj)
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
}
tombAddr := object.AddressOf(obj)
memberIDs := tombstone.Members()
tombMembers := make([]oid.Address, 0, len(memberIDs))
for i := range memberIDs {
a := tombAddr
a.SetObject(memberIDs[i])
tombMembers = append(tombMembers, a)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetAddresses(tombMembers...)
_, err = s.metaBase.Inhume(inhumePrm)
if err != nil {
return fmt.Errorf("could not inhume objects: %w", err)
}
case objectSDK.TypeLock: case objectSDK.TypeLock:
var lock objectSDK.Lock err = s.refillLockObject(obj)
if err := lock.Unmarshal(obj.Payload()); err != nil { default:
return fmt.Errorf("could not unmarshal lock content: %w", err) }
} if err != nil {
return err
locked := make([]oid.ID, lock.NumberOfMembers())
lock.ReadMembers(locked)
cnr, _ := obj.ContainerID()
id, _ := obj.ID()
err = s.metaBase.Lock(cnr, id, locked)
if err != nil {
return fmt.Errorf("could not lock objects: %w", err)
}
} }
var mPrm meta.PutPrm var mPrm meta.PutPrm
mPrm.SetObject(obj) mPrm.SetObject(obj)
mPrm.SetStorageID(descriptor) mPrm.SetStorageID(descriptor)
_, err := s.metaBase.Put(mPrm) _, err = s.metaBase.Put(mPrm)
if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
return err return err
} }
@ -241,6 +206,54 @@ func (s *Shard) refillMetabase() error {
return nil return nil
} }
func (s *Shard) refillLockObject(obj *objectSDK.Object) error {
var lock objectSDK.Lock
if err := lock.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal lock content: %w", err)
}
locked := make([]oid.ID, lock.NumberOfMembers())
lock.ReadMembers(locked)
cnr, _ := obj.ContainerID()
id, _ := obj.ID()
err := s.metaBase.Lock(cnr, id, locked)
if err != nil {
return fmt.Errorf("could not lock objects: %w", err)
}
return nil
}
func (s *Shard) refillTombstoneObject(obj *objectSDK.Object) error {
tombstone := objectSDK.NewTombstone()
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
}
tombAddr := object.AddressOf(obj)
memberIDs := tombstone.Members()
tombMembers := make([]oid.Address, 0, len(memberIDs))
for i := range memberIDs {
a := tombAddr
a.SetObject(memberIDs[i])
tombMembers = append(tombMembers, a)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetAddresses(tombMembers...)
_, err := s.metaBase.Inhume(inhumePrm)
if err != nil {
return fmt.Errorf("could not inhume objects: %w", err)
}
return nil
}
// Close releases all Shard's components. // Close releases all Shard's components.
func (s *Shard) Close() error { func (s *Shard) Close() error {
components := []interface{ Close() error }{} components := []interface{ Close() error }{}

View file

@ -42,7 +42,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
} }
// TreeApply implements the pilorama.Forest interface. // TreeApply implements the pilorama.Forest interface.
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error { func (s *Shard) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
if s.pilorama == nil { if s.pilorama == nil {
return ErrPiloramaDisabled return ErrPiloramaDisabled
} }
@ -53,7 +53,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
if s.info.Mode.ReadOnly() { if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode return ErrReadOnlyMode
} }
return s.pilorama.TreeApply(d, treeID, m, backgroundSync) return s.pilorama.TreeApply(cnr, treeID, m, backgroundSync)
} }
// TreeGetByPath implements the pilorama.Forest interface. // TreeGetByPath implements the pilorama.Forest interface.

View file

@ -426,7 +426,7 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
return nil return nil
} }
// nolint: funlen, gocognit // nolint: funlen
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) { func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
body := req.GetBody() body := req.GetBody()
@ -459,138 +459,140 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
body: resp.GetBody(), body: resp.GetBody(),
}) })
if !commonPrm.LocalOnly() { if commonPrm.LocalOnly() {
var onceResign sync.Once return p, nil
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.GetObjectHeader implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// send Head request
var headResp *objectV2.HeadResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if !body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
h := v
hdr = new(objectV2.Header)
hdr.SetPayloadLength(h.GetPayloadLength())
hdr.SetVersion(h.GetVersion())
hdr.SetOwnerID(h.GetOwnerID())
hdr.SetObjectType(h.GetObjectType())
hdr.SetCreationEpoch(h.GetCreationEpoch())
hdr.SetPayloadHash(h.GetPayloadHash())
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
case *objectV2.HeaderWithSignature:
if body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
hdrWithSig := v
if hdrWithSig == nil {
return nil, errors.New("nil object part")
}
hdr = hdrWithSig.GetHeader()
idSig = hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, errors.New("missing signature")
}
binID, err := objAddr.Object().Marshal()
if err != nil {
return nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, errors.New("invalid object ID signature")
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(objAddr.Object())
// convert the object
return obj, nil
}))
} }
var onceResign sync.Once
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.GetObjectHeader implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// send Head request
var headResp *objectV2.HeadResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if !body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
h := v
hdr = new(objectV2.Header)
hdr.SetPayloadLength(h.GetPayloadLength())
hdr.SetVersion(h.GetVersion())
hdr.SetOwnerID(h.GetOwnerID())
hdr.SetObjectType(h.GetObjectType())
hdr.SetCreationEpoch(h.GetCreationEpoch())
hdr.SetPayloadHash(h.GetPayloadHash())
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
case *objectV2.HeaderWithSignature:
if body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
hdrWithSig := v
if hdrWithSig == nil {
return nil, errors.New("nil object part")
}
hdr = hdrWithSig.GetHeader()
idSig = hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, errors.New("missing signature")
}
binID, err := objAddr.Object().Marshal()
if err != nil {
return nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, errors.New("invalid object ID signature")
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(objAddr.Object())
// convert the object
return obj, nil
}))
return p, nil return p, nil
} }

View file

@ -27,7 +27,7 @@ type replicationTask struct {
type applyOp struct { type applyOp struct {
treeID string treeID string
pilorama.CIDDescriptor cid cidSDK.ID
pilorama.Move pilorama.Move
} }
@ -43,7 +43,7 @@ func (s *Service) localReplicationWorker() {
case <-s.closeCh: case <-s.closeCh:
return return
case op := <-s.replicateLocalCh: case op := <-s.replicateLocalCh:
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false) err := s.forest.TreeApply(op.cid, op.treeID, &op.Move, false)
if err != nil { if err != nil {
s.log.Error("failed to apply replicated operation", s.log.Error("failed to apply replicated operation",
zap.String("err", err.Error())) zap.String("err", err.Error()))
@ -52,7 +52,7 @@ func (s *Service) localReplicationWorker() {
} }
} }
func (s *Service) replicationWorker() { func (s *Service) replicationWorker(ctx context.Context) {
for { for {
select { select {
case <-s.closeCh: case <-s.closeCh:
@ -64,13 +64,13 @@ func (s *Service) replicationWorker() {
task.n.IterateNetworkEndpoints(func(addr string) bool { task.n.IterateNetworkEndpoints(func(addr string) bool {
lastAddr = addr lastAddr = addr
c, err := s.cache.get(context.Background(), addr) c, err := s.cache.get(ctx, addr)
if err != nil { if err != nil {
lastErr = fmt.Errorf("can't create client: %w", err) lastErr = fmt.Errorf("can't create client: %w", err)
return false return false
} }
ctx, cancel := context.WithTimeout(context.Background(), s.replicatorTimeout) ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
_, lastErr = c.Apply(ctx, task.req) _, lastErr = c.Apply(ctx, task.req)
cancel() cancel()
@ -94,8 +94,7 @@ func (s *Service) replicationWorker() {
func (s *Service) replicateLoop(ctx context.Context) { func (s *Service) replicateLoop(ctx context.Context) {
for i := 0; i < s.replicatorWorkerCount; i++ { for i := 0; i < s.replicatorWorkerCount; i++ {
//nolint: contextcheck go s.replicationWorker(ctx)
go s.replicationWorker()
go s.localReplicationWorker() go s.localReplicationWorker()
} }
defer func() { defer func() {

View file

@ -468,7 +468,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
key := req.GetSignature().GetKey() key := req.GetSignature().GetKey()
_, pos, size, err := s.getContainerInfo(cid, key) _, pos, _, err := s.getContainerInfo(cid, key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -485,8 +485,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
select { select {
case s.replicateLocalCh <- applyOp{ case s.replicateLocalCh <- applyOp{
treeID: req.GetBody().GetTreeId(), treeID: req.GetBody().GetTreeId(),
CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}, cid: cid,
Move: pilorama.Move{ Move: pilorama.Move{
Parent: op.GetParentId(), Parent: op.GetParentId(),
Child: op.GetChildId(), Child: op.GetChildId(),

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
@ -40,11 +41,6 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
return ErrNotInContainer return ErrNotInContainer
} }
var d pilorama.CIDDescriptor
d.CID = cid
d.Position = pos
d.Size = len(nodes)
nodes = randomizeNodeOrder(nodes, pos) nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 { if len(nodes) == 0 {
return nil return nil
@ -87,18 +83,18 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
} }
for _, tid := range treesToSync { for _, tid := range treesToSync {
h, err := s.forest.TreeLastSyncHeight(d.CID, tid) h, err := s.forest.TreeLastSyncHeight(cid, tid)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
s.log.Warn("could not get last synchronized height for a tree", s.log.Warn("could not get last synchronized height for a tree",
zap.Stringer("cid", d.CID), zap.Stringer("cid", cid),
zap.String("tree", tid)) zap.String("tree", tid))
continue continue
} }
newHeight := s.synchronizeTree(ctx, d, h, tid, nodes) newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
if h < newHeight { if h < newHeight {
if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil { if err := s.forest.TreeUpdateLastSyncHeight(cid, tid, newHeight); err != nil {
s.log.Warn("could not update last synchronized height for a tree", s.log.Warn("could not update last synchronized height for a tree",
zap.Stringer("cid", d.CID), zap.Stringer("cid", cid),
zap.String("tree", tid)) zap.String("tree", tid))
} }
} }
@ -118,24 +114,19 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
return ErrNotInContainer return ErrNotInContainer
} }
var d pilorama.CIDDescriptor
d.CID = cid
d.Position = pos
d.Size = len(nodes)
nodes = randomizeNodeOrder(nodes, pos) nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 { if len(nodes) == 0 {
return nil return nil
} }
s.synchronizeTree(ctx, d, 0, treeID, nodes) s.synchronizeTree(ctx, cid, 0, treeID, nodes)
return nil return nil
} }
func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, from uint64, func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint64,
treeID string, nodes []netmapSDK.NodeInfo) uint64 { treeID string, nodes []netmapSDK.NodeInfo) uint64 {
s.log.Debug("synchronize tree", s.log.Debug("synchronize tree",
zap.Stringer("cid", d.CID), zap.Stringer("cid", cid),
zap.String("tree", treeID), zap.String("tree", treeID),
zap.Uint64("from", from)) zap.Uint64("from", from))
@ -157,7 +148,7 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
treeClient := NewTreeServiceClient(cc) treeClient := NewTreeServiceClient(cc)
for { for {
h, err := s.synchronizeSingle(ctx, d, treeID, height, treeClient) h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient)
if height < h { if height < h {
height = h height = h
} }
@ -179,9 +170,9 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
return newHeight return newHeight
} }
func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) { func (s *Service) synchronizeSingle(ctx context.Context, cid cidSDK.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
rawCID := make([]byte, sha256.Size) rawCID := make([]byte, sha256.Size)
d.CID.Encode(rawCID) cid.Encode(rawCID)
for { for {
newHeight := height newHeight := height
@ -211,7 +202,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto
if err := m.Meta.FromBytes(lm.Meta); err != nil { if err := m.Meta.FromBytes(lm.Meta); err != nil {
return newHeight, err return newHeight, err
} }
if err := s.forest.TreeApply(d, treeID, m, true); err != nil { if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
return newHeight, err return newHeight, err
} }
if m.Time > newHeight { if m.Time > newHeight {
@ -255,7 +246,6 @@ func (s *Service) SynchronizeAll() error {
} }
} }
// nolint: funlen, gocognit
func (s *Service) syncLoop(ctx context.Context) { func (s *Service) syncLoop(ctx context.Context) {
for { for {
select { select {
@ -272,86 +262,99 @@ func (s *Service) syncLoop(ctx context.Context) {
continue continue
} }
newMap := make(map[cid.ID]struct{}, len(s.cnrMap)) newMap, cnrsToSync := s.containersToSync(cnrs)
cnrsToSync := make([]cid.ID, 0, len(cnrs))
var removed []cid.ID s.syncContainers(ctx, cnrsToSync)
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error("could not calculate container nodes",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
if pos < 0 { s.removeContainers(ctx, newMap)
// node is not included in the container.
continue
}
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
}
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrsToSync {
wg.Add(1)
cnr := cnr
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
return
}
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
})
if err != nil {
wg.Done()
s.log.Error("could not query trees for synchronization",
zap.Stringer("cid", cnr),
zap.Error(err))
if errors.Is(err, ants.ErrPoolClosed) {
return
}
}
}
wg.Wait()
s.cnrMapMtx.Lock()
for cnr := range s.cnrMap {
if _, ok := newMap[cnr]; ok {
continue
}
removed = append(removed, cnr)
}
for i := range removed {
delete(s.cnrMap, removed[i])
}
s.cnrMapMtx.Unlock()
for _, cnr := range removed {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err = s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
}
s.log.Debug("trees have been synchronized") s.log.Debug("trees have been synchronized")
} }
} }
} }
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrs {
wg.Add(1)
cnr := cnr
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
return
}
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
})
if err != nil {
wg.Done()
s.log.Error("could not query trees for synchronization",
zap.Stringer("cid", cnr),
zap.Error(err))
if errors.Is(err, ants.ErrPoolClosed) {
return
}
}
}
wg.Wait()
}
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
s.cnrMapMtx.Lock()
defer s.cnrMapMtx.Unlock()
var removed []cid.ID
for cnr := range s.cnrMap {
if _, ok := newContainers[cnr]; ok {
continue
}
removed = append(removed, cnr)
}
for i := range removed {
delete(s.cnrMap, removed[i])
}
for _, cnr := range removed {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err := s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),
zap.Error(err))
}
}
}
func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
cnrsToSync := make([]cid.ID, 0, len(cnrs))
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error("could not calculate container nodes",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
if pos < 0 {
// node is not included in the container.
continue
}
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
}
return newMap, cnrsToSync
}
// randomizeNodeOrder shuffles nodes and removes not a `pos` index. // randomizeNodeOrder shuffles nodes and removes not a `pos` index.
// It is assumed that 0 <= pos < len(nodes). // It is assumed that 0 <= pos < len(nodes).
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo { func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {