Fix some of the linter exceptions #156
13 changed files with 460 additions and 444 deletions
|
@ -44,7 +44,6 @@ const (
|
|||
notaryEnabled = true
|
||||
)
|
||||
|
||||
// nolint: funlen, gocognit
|
||||
func dumpBalances(cmd *cobra.Command, _ []string) error {
|
||||
var (
|
||||
dumpStorage, _ = cmd.Flags().GetBool(dumpBalancesStorageFlag)
|
||||
|
@ -84,86 +83,110 @@ func dumpBalances(cmd *cobra.Command, _ []string) error {
|
|||
printBalances(cmd, "Inner ring nodes balances:", irList)
|
||||
|
||||
if dumpStorage {
|
||||
arr, err := unwrap.Array(inv.Call(nmHash, "netmap"))
|
||||
if err != nil {
|
||||
return errors.New("can't fetch the list of storage nodes")
|
||||
}
|
||||
|
||||
snList := make([]accBalancePair, len(arr))
|
||||
for i := range arr {
|
||||
node, ok := arr[i].Value().([]stackitem.Item)
|
||||
if !ok || len(node) == 0 {
|
||||
return errors.New("can't parse the list of storage nodes")
|
||||
}
|
||||
bs, err := node[0].TryBytes()
|
||||
if err != nil {
|
||||
return errors.New("can't parse the list of storage nodes")
|
||||
}
|
||||
var ni netmap.NodeInfo
|
||||
if err := ni.Unmarshal(bs); err != nil {
|
||||
return fmt.Errorf("can't parse the list of storage nodes: %w", err)
|
||||
}
|
||||
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't parse storage node public key: %w", err)
|
||||
}
|
||||
snList[i].scriptHash = pub.GetScriptHash()
|
||||
}
|
||||
|
||||
if err := fetchBalances(inv, gas.Hash, snList); err != nil {
|
||||
if err := printStorageNodeBalances(cmd, inv, nmHash); err != nil {
|
||||
return err
|
||||
}
|
||||
printBalances(cmd, "\nStorage node balances:", snList)
|
||||
}
|
||||
|
||||
if dumpProxy {
|
||||
h, err := nnsResolveHash(inv, nnsCs.Hash, proxyContract+".frostfs")
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't get hash of the proxy contract: %w", err)
|
||||
}
|
||||
|
||||
proxyList := []accBalancePair{{scriptHash: h}}
|
||||
if err := fetchBalances(inv, gas.Hash, proxyList); err != nil {
|
||||
if err := printProxyContractBalance(cmd, inv, nnsCs.Hash); err != nil {
|
||||
return err
|
||||
}
|
||||
printBalances(cmd, "\nProxy contract balance:", proxyList)
|
||||
}
|
||||
|
||||
if dumpAlphabet {
|
||||
alphaList := make([]accBalancePair, len(irList))
|
||||
|
||||
w := io.NewBufBinWriter()
|
||||
for i := range alphaList {
|
||||
emit.AppCall(w.BinWriter, nnsCs.Hash, "resolve", callflag.ReadOnly,
|
||||
getAlphabetNNSDomain(i),
|
||||
int64(nns.TXT))
|
||||
}
|
||||
if w.Err != nil {
|
||||
panic(w.Err)
|
||||
}
|
||||
|
||||
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't fetch info from NNS: %w", err)
|
||||
}
|
||||
|
||||
for i := range alphaList {
|
||||
h, err := parseNNSResolveResult(alphaRes.Stack[i])
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err)
|
||||
}
|
||||
alphaList[i].scriptHash = h
|
||||
}
|
||||
|
||||
if err := fetchBalances(inv, gas.Hash, alphaList); err != nil {
|
||||
if err := printAlphabetContractBalances(cmd, c, inv, len(irList), nnsCs.Hash); err != nil {
|
||||
return err
|
||||
}
|
||||
printBalances(cmd, "\nAlphabet contracts balances:", alphaList)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func printStorageNodeBalances(cmd *cobra.Command, inv *invoker.Invoker, nmHash util.Uint160) error {
|
||||
arr, err := unwrap.Array(inv.Call(nmHash, "netmap"))
|
||||
if err != nil {
|
||||
return errors.New("can't fetch the list of storage nodes")
|
||||
}
|
||||
|
||||
snList := make([]accBalancePair, len(arr))
|
||||
for i := range arr {
|
||||
node, ok := arr[i].Value().([]stackitem.Item)
|
||||
if !ok || len(node) == 0 {
|
||||
return errors.New("can't parse the list of storage nodes")
|
||||
}
|
||||
bs, err := node[0].TryBytes()
|
||||
if err != nil {
|
||||
return errors.New("can't parse the list of storage nodes")
|
||||
}
|
||||
var ni netmap.NodeInfo
|
||||
if err := ni.Unmarshal(bs); err != nil {
|
||||
return fmt.Errorf("can't parse the list of storage nodes: %w", err)
|
||||
}
|
||||
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't parse storage node public key: %w", err)
|
||||
}
|
||||
snList[i].scriptHash = pub.GetScriptHash()
|
||||
}
|
||||
|
||||
if err := fetchBalances(inv, gas.Hash, snList); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
printBalances(cmd, "\nStorage node balances:", snList)
|
||||
return nil
|
||||
}
|
||||
|
||||
func printProxyContractBalance(cmd *cobra.Command, inv *invoker.Invoker, nnsHash util.Uint160) error {
|
||||
h, err := nnsResolveHash(inv, nnsHash, proxyContract+".frostfs")
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't get hash of the proxy contract: %w", err)
|
||||
}
|
||||
|
||||
proxyList := []accBalancePair{{scriptHash: h}}
|
||||
if err := fetchBalances(inv, gas.Hash, proxyList); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
printBalances(cmd, "\nProxy contract balance:", proxyList)
|
||||
return nil
|
||||
}
|
||||
|
||||
func printAlphabetContractBalances(cmd *cobra.Command, c Client, inv *invoker.Invoker, count int, nnsHash util.Uint160) error {
|
||||
alphaList := make([]accBalancePair, count)
|
||||
|
||||
w := io.NewBufBinWriter()
|
||||
for i := range alphaList {
|
||||
emit.AppCall(w.BinWriter, nnsHash, "resolve", callflag.ReadOnly,
|
||||
getAlphabetNNSDomain(i),
|
||||
int64(nns.TXT))
|
||||
}
|
||||
if w.Err != nil {
|
||||
panic(w.Err)
|
||||
}
|
||||
|
||||
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't fetch info from NNS: %w", err)
|
||||
}
|
||||
|
||||
for i := range alphaList {
|
||||
h, err := parseNNSResolveResult(alphaRes.Stack[i])
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err)
|
||||
}
|
||||
alphaList[i].scriptHash = h
|
||||
}
|
||||
|
||||
if err := fetchBalances(inv, gas.Hash, alphaList); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
printBalances(cmd, "\nAlphabet contracts balances:", alphaList)
|
||||
return nil
|
||||
}
|
||||
|
||||
func fetchIRNodes(c Client, nmHash, desigHash util.Uint160) ([]accBalancePair, error) {
|
||||
var irList []accBalancePair
|
||||
|
||||
|
|
|
@ -51,17 +51,17 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a
|
|||
}
|
||||
|
||||
// TreeApply implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
||||
index, lst, err := e.getTreeShard(d.CID, treeID)
|
||||
func (e *StorageEngine) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
||||
index, lst, err := e.getTreeShard(cnr, treeID)
|
||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = lst[index].TreeApply(d, treeID, m, backgroundSync)
|
||||
err = lst[index].TreeApply(cnr, treeID, m, backgroundSync)
|
||||
if err != nil {
|
||||
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||
e.reportShardError(lst[index], "can't perform `TreeApply`", err,
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.String("tree", treeID))
|
||||
}
|
||||
return err
|
||||
|
|
|
@ -138,57 +138,57 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
|||
// Operation is very resource-intensive, which is caused by the admissibility
|
||||
// of multiple locks. Also, if we knew what objects are locked, it would be
|
||||
// possible to speed up the execution.
|
||||
//
|
||||
// nolint: gocognit
|
||||
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
|
||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||
if bucketLocked != nil {
|
||||
key := make([]byte, cidSize)
|
||||
idCnr.Encode(key)
|
||||
if bucketLocked == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
||||
if bucketLockedContainer != nil {
|
||||
keyLocker := objectKey(locker, key)
|
||||
return bucketLockedContainer.ForEach(func(k, v []byte) error {
|
||||
keyLockers, err := decodeList(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
||||
}
|
||||
key := make([]byte, cidSize)
|
||||
idCnr.Encode(key)
|
||||
|
||||
for i := range keyLockers {
|
||||
if bytes.Equal(keyLockers[i], keyLocker) {
|
||||
if len(keyLockers) == 1 {
|
||||
// locker was all alone
|
||||
err = bucketLockedContainer.Delete(k)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
||||
}
|
||||
} else {
|
||||
// exclude locker
|
||||
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
||||
if bucketLockedContainer == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
v, err = encodeList(keyLockers)
|
||||
if err != nil {
|
||||
return fmt.Errorf("encode updated list of lockers: %w", err)
|
||||
}
|
||||
keyLocker := objectKey(locker, key)
|
||||
return bucketLockedContainer.ForEach(func(k, v []byte) error {
|
||||
keyLockers, err := decodeList(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
||||
}
|
||||
|
||||
// update the record
|
||||
err = bucketLockedContainer.Put(k, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update list of lockers: %w", err)
|
||||
}
|
||||
}
|
||||
for i := range keyLockers {
|
||||
if bytes.Equal(keyLockers[i], keyLocker) {
|
||||
if len(keyLockers) == 1 {
|
||||
// locker was all alone
|
||||
err = bucketLockedContainer.Delete(k)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
||||
}
|
||||
} else {
|
||||
// exclude locker
|
||||
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
||||
|
||||
return nil
|
||||
v, err = encodeList(keyLockers)
|
||||
if err != nil {
|
||||
return fmt.Errorf("encode updated list of lockers: %w", err)
|
||||
}
|
||||
|
||||
// update the record
|
||||
err = bucketLockedContainer.Put(k, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update list of lockers: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// IsLockedPrm groups the parameters of IsLocked operation.
|
||||
|
|
|
@ -327,11 +327,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
|
|||
}
|
||||
|
||||
// TreeApply implements the Forest interface.
|
||||
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error {
|
||||
if !d.checkValid() {
|
||||
return ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
||||
func (t *boltForest) TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
|
@ -344,7 +340,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
|||
if backgroundSync {
|
||||
var seen bool
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(bucketName(d.CID, treeID))
|
||||
treeRoot := tx.Bucket(bucketName(cnr, treeID))
|
||||
if treeRoot == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -362,7 +358,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
|||
}
|
||||
|
||||
if t.db.MaxBatchSize == 1 {
|
||||
fullID := bucketName(d.CID, treeID)
|
||||
fullID := bucketName(cnr, treeID)
|
||||
return t.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
|
||||
if err != nil {
|
||||
|
@ -375,11 +371,11 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
|||
}
|
||||
|
||||
ch := make(chan error, 1)
|
||||
t.addBatch(d, treeID, m, ch)
|
||||
t.addBatch(cnr, treeID, m, ch)
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
|
||||
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
|
||||
t.mtx.Lock()
|
||||
for i := 0; i < len(t.batches); i++ {
|
||||
t.batches[i].mtx.Lock()
|
||||
|
@ -391,7 +387,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
|
|||
continue
|
||||
}
|
||||
|
||||
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
|
||||
found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID
|
||||
if found {
|
||||
t.batches[i].results = append(t.batches[i].results, ch)
|
||||
t.batches[i].operations = append(t.batches[i].operations, m)
|
||||
|
@ -412,7 +408,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
|
|||
}
|
||||
b := &batch{
|
||||
forest: t,
|
||||
cid: d.CID,
|
||||
cid: cnr,
|
||||
treeID: treeID,
|
||||
results: []chan<- error{ch},
|
||||
operations: []*Move{m},
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
|
@ -93,12 +94,8 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
|
|||
}
|
||||
|
||||
// TreeApply implements the Forest interface.
|
||||
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move, _ bool) error {
|
||||
if !d.checkValid() {
|
||||
return ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
||||
fullID := d.CID.String() + "/" + treeID
|
||||
func (f *memoryForest) TreeApply(cnr cid.ID, treeID string, op *Move, _ bool) error {
|
||||
fullID := cnr.String() + "/" + treeID
|
||||
s, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
s = newState()
|
||||
|
|
|
@ -411,21 +411,10 @@ func TestForest_Apply(t *testing.T) {
|
|||
|
||||
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
t.Run("invalid descriptor", func(t *testing.T) {
|
||||
s := constructor(t)
|
||||
err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{
|
||||
Child: 10,
|
||||
Parent: 0,
|
||||
Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}},
|
||||
}, false)
|
||||
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
|
||||
})
|
||||
|
||||
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
|
||||
require.NoError(t, s.TreeApply(d, treeID, &Move{
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||
Child: child,
|
||||
Parent: parent,
|
||||
Meta: meta,
|
||||
|
@ -465,7 +454,6 @@ func TestForest_GetOpLog(t *testing.T) {
|
|||
|
||||
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
logs := []Move{
|
||||
{
|
||||
|
@ -491,7 +479,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Op
|
|||
})
|
||||
|
||||
for i := range logs {
|
||||
require.NoError(t, s.TreeApply(d, treeID, &logs[i], false))
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false))
|
||||
}
|
||||
|
||||
testGetOpLog := func(t *testing.T, height uint64, m Move) {
|
||||
|
@ -533,13 +521,12 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
|
|||
|
||||
cid := cidtest.ID()
|
||||
treeID := "version"
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
|
||||
t.Run("empty state, no panic", func(t *testing.T) {
|
||||
checkExists(t, false, cid, treeID)
|
||||
})
|
||||
|
||||
require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}, false))
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
|
||||
checkExists(t, true, cid, treeID)
|
||||
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||
|
@ -570,16 +557,16 @@ func TestApplyTricky1(t *testing.T) {
|
|||
}
|
||||
|
||||
treeID := "version"
|
||||
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1}
|
||||
cid := cidtest.ID()
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
s := providers[i].construct(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
|
||||
}
|
||||
|
||||
for i := range expected {
|
||||
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child)
|
||||
_, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected[i].parent, parent)
|
||||
}
|
||||
|
@ -631,16 +618,16 @@ func TestApplyTricky2(t *testing.T) {
|
|||
}
|
||||
|
||||
treeID := "version"
|
||||
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1}
|
||||
cid := cidtest.ID()
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
s := providers[i].construct(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
|
||||
}
|
||||
|
||||
for i := range expected {
|
||||
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child)
|
||||
_, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected[i].parent, parent)
|
||||
}
|
||||
|
@ -746,12 +733,11 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
|
|||
ops := prepareRandomTree(nodeCount, opCount)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
expected := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
||||
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
|
||||
}
|
||||
|
||||
for i := 0; i < iterCount; i++ {
|
||||
|
@ -766,7 +752,7 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
|
|||
go func() {
|
||||
defer wg.Done()
|
||||
for op := range ch {
|
||||
require.NoError(t, actual.TreeApply(d, treeID, op, false))
|
||||
require.NoError(t, actual.TreeApply(cid, treeID, op, false))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -792,12 +778,11 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
|
|||
ops := prepareRandomTree(nodeCount, opCount)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
expected := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
||||
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
|
||||
}
|
||||
|
||||
const iterCount = 200
|
||||
|
@ -807,7 +792,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
|
|||
|
||||
actual := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
|
||||
require.NoError(t, actual.TreeApply(cid, treeID, &ops[i], false))
|
||||
}
|
||||
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
||||
}
|
||||
|
@ -889,7 +874,6 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
|||
|
||||
ops := genFunc(b.N)
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
ch := make(chan int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
@ -901,7 +885,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
|||
b.SetParallelism(10)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
|
||||
if err := s.TreeApply(cid, treeID, &ops[<-ch], false); err != nil {
|
||||
b.Fatalf("error in `Apply`: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -918,7 +902,6 @@ func TestTreeGetByPath(t *testing.T) {
|
|||
|
||||
func testTreeGetByPath(t *testing.T, s Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
// /
|
||||
|
@ -928,12 +911,12 @@ func testTreeGetByPath(t *testing.T, s Forest) {
|
|||
// |- cat1.jpg, Version=XXX (4)
|
||||
// |- cat1.jpg, Version=YYY (5)
|
||||
// |- cat2.jpg, Version=ZZZ (6)
|
||||
testMove(t, s, 0, 1, 0, d, treeID, "a", "")
|
||||
testMove(t, s, 1, 2, 0, d, treeID, "b", "")
|
||||
testMove(t, s, 2, 3, 1, d, treeID, "cat1.jpg", "TTT")
|
||||
testMove(t, s, 3, 4, 2, d, treeID, "cat1.jpg", "XXX")
|
||||
testMove(t, s, 4, 5, 2, d, treeID, "cat1.jpg", "YYY")
|
||||
testMove(t, s, 5, 6, 2, d, treeID, "cat2.jpg", "ZZZ")
|
||||
testMove(t, s, 0, 1, 0, cid, treeID, "a", "")
|
||||
testMove(t, s, 1, 2, 0, cid, treeID, "b", "")
|
||||
testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT")
|
||||
testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX")
|
||||
testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY")
|
||||
testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ")
|
||||
|
||||
if mf, ok := s.(*memoryForest); ok {
|
||||
single := mf.treeMap[cid.String()+"/"+treeID]
|
||||
|
@ -970,14 +953,14 @@ func testTreeGetByPath(t *testing.T, s Forest) {
|
|||
})
|
||||
}
|
||||
|
||||
func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor, treeID, filename, version string) {
|
||||
func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) {
|
||||
items := make([]KeyValue, 1, 2)
|
||||
items[0] = KeyValue{AttributeFilename, []byte(filename)}
|
||||
if version != "" {
|
||||
items = append(items, KeyValue{AttributeVersion, []byte(version)})
|
||||
}
|
||||
|
||||
require.NoError(t, s.TreeApply(d, treeID, &Move{
|
||||
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||
Parent: parent,
|
||||
Child: node,
|
||||
Meta: Meta{
|
||||
|
|
|
@ -18,7 +18,7 @@ type Forest interface {
|
|||
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
|
||||
// TreeApply applies replicated operation from another node.
|
||||
// If background is true, TreeApply will first check whether an operation exists.
|
||||
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error
|
||||
TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
||||
// TreeGetByPath returns all nodes corresponding to the path.
|
||||
// The path is constructed by descending from the root using the values of the
|
||||
// AttributeFilename in meta.
|
||||
|
|
|
@ -155,7 +155,6 @@ func (s *Shard) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// nolint: funlen
|
||||
func (s *Shard) refillMetabase() error {
|
||||
err := s.metaBase.Reset()
|
||||
if err != nil {
|
||||
|
@ -172,57 +171,23 @@ func (s *Shard) refillMetabase() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// nolint: exhaustive
|
||||
var err error
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
|
||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
||||
}
|
||||
|
||||
tombAddr := object.AddressOf(obj)
|
||||
memberIDs := tombstone.Members()
|
||||
tombMembers := make([]oid.Address, 0, len(memberIDs))
|
||||
|
||||
for i := range memberIDs {
|
||||
a := tombAddr
|
||||
a.SetObject(memberIDs[i])
|
||||
|
||||
tombMembers = append(tombMembers, a)
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||
inhumePrm.SetAddresses(tombMembers...)
|
||||
|
||||
_, err = s.metaBase.Inhume(inhumePrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not inhume objects: %w", err)
|
||||
}
|
||||
err = s.refillTombstoneObject(obj)
|
||||
case objectSDK.TypeLock:
|
||||
var lock objectSDK.Lock
|
||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
||||
}
|
||||
|
||||
locked := make([]oid.ID, lock.NumberOfMembers())
|
||||
lock.ReadMembers(locked)
|
||||
|
||||
cnr, _ := obj.ContainerID()
|
||||
id, _ := obj.ID()
|
||||
err = s.metaBase.Lock(cnr, id, locked)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not lock objects: %w", err)
|
||||
}
|
||||
err = s.refillLockObject(obj)
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var mPrm meta.PutPrm
|
||||
mPrm.SetObject(obj)
|
||||
mPrm.SetStorageID(descriptor)
|
||||
|
||||
_, err := s.metaBase.Put(mPrm)
|
||||
_, err = s.metaBase.Put(mPrm)
|
||||
if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||
return err
|
||||
}
|
||||
|
@ -241,6 +206,54 @@ func (s *Shard) refillMetabase() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Shard) refillLockObject(obj *objectSDK.Object) error {
|
||||
var lock objectSDK.Lock
|
||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
||||
}
|
||||
|
||||
locked := make([]oid.ID, lock.NumberOfMembers())
|
||||
lock.ReadMembers(locked)
|
||||
|
||||
cnr, _ := obj.ContainerID()
|
||||
id, _ := obj.ID()
|
||||
err := s.metaBase.Lock(cnr, id, locked)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not lock objects: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Shard) refillTombstoneObject(obj *objectSDK.Object) error {
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
|
||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
||||
}
|
||||
|
||||
tombAddr := object.AddressOf(obj)
|
||||
memberIDs := tombstone.Members()
|
||||
tombMembers := make([]oid.Address, 0, len(memberIDs))
|
||||
|
||||
for i := range memberIDs {
|
||||
a := tombAddr
|
||||
a.SetObject(memberIDs[i])
|
||||
|
||||
tombMembers = append(tombMembers, a)
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||
inhumePrm.SetAddresses(tombMembers...)
|
||||
|
||||
_, err := s.metaBase.Inhume(inhumePrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not inhume objects: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close releases all Shard's components.
|
||||
func (s *Shard) Close() error {
|
||||
components := []interface{ Close() error }{}
|
||||
|
|
|
@ -42,7 +42,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
|
|||
}
|
||||
|
||||
// TreeApply implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
||||
func (s *Shard) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
||||
if s.pilorama == nil {
|
||||
return ErrPiloramaDisabled
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
|
|||
if s.info.Mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
return s.pilorama.TreeApply(d, treeID, m, backgroundSync)
|
||||
return s.pilorama.TreeApply(cnr, treeID, m, backgroundSync)
|
||||
}
|
||||
|
||||
// TreeGetByPath implements the pilorama.Forest interface.
|
||||
|
|
|
@ -426,7 +426,7 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
|
|||
return nil
|
||||
}
|
||||
|
||||
// nolint: funlen, gocognit
|
||||
// nolint: funlen
|
||||
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
|
||||
body := req.GetBody()
|
||||
|
||||
|
@ -459,138 +459,140 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
|
|||
body: resp.GetBody(),
|
||||
})
|
||||
|
||||
if !commonPrm.LocalOnly() {
|
||||
var onceResign sync.Once
|
||||
|
||||
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||
var err error
|
||||
|
||||
key, err := s.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// once compose and resign forwarding request
|
||||
onceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
// TODO: #1165 think how to set the other fields
|
||||
metaHdr.SetOrigin(meta)
|
||||
writeCurrentVersion(metaHdr)
|
||||
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// code below is copy-pasted from c.GetObjectHeader implementation,
|
||||
// perhaps it is worth highlighting the utility function in frostfs-api-go
|
||||
|
||||
// send Head request
|
||||
var headResp *objectV2.HeadResponse
|
||||
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response key
|
||||
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
hdr *objectV2.Header
|
||||
idSig *refs.Signature
|
||||
)
|
||||
|
||||
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
||||
case *objectV2.ShortHeader:
|
||||
if !body.GetMainOnly() {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
h := v
|
||||
|
||||
hdr = new(objectV2.Header)
|
||||
hdr.SetPayloadLength(h.GetPayloadLength())
|
||||
hdr.SetVersion(h.GetVersion())
|
||||
hdr.SetOwnerID(h.GetOwnerID())
|
||||
hdr.SetObjectType(h.GetObjectType())
|
||||
hdr.SetCreationEpoch(h.GetCreationEpoch())
|
||||
hdr.SetPayloadHash(h.GetPayloadHash())
|
||||
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
|
||||
case *objectV2.HeaderWithSignature:
|
||||
if body.GetMainOnly() {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
hdrWithSig := v
|
||||
if hdrWithSig == nil {
|
||||
return nil, errors.New("nil object part")
|
||||
}
|
||||
|
||||
hdr = hdrWithSig.GetHeader()
|
||||
idSig = hdrWithSig.GetSignature()
|
||||
|
||||
if idSig == nil {
|
||||
// TODO(@cthulhu-rider): #1387 use "const" error
|
||||
return nil, errors.New("missing signature")
|
||||
}
|
||||
|
||||
binID, err := objAddr.Object().Marshal()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal ID: %w", err)
|
||||
}
|
||||
|
||||
var sig frostfscrypto.Signature
|
||||
if err := sig.ReadFromV2(*idSig); err != nil {
|
||||
return nil, fmt.Errorf("can't read signature: %w", err)
|
||||
}
|
||||
|
||||
if !sig.Verify(binID) {
|
||||
return nil, errors.New("invalid object ID signature")
|
||||
}
|
||||
case *objectV2.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
|
||||
objv2 := new(objectV2.Object)
|
||||
objv2.SetHeader(hdr)
|
||||
objv2.SetSignature(idSig)
|
||||
|
||||
obj := object.NewFromV2(objv2)
|
||||
obj.SetID(objAddr.Object())
|
||||
|
||||
// convert the object
|
||||
return obj, nil
|
||||
}))
|
||||
if commonPrm.LocalOnly() {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
var onceResign sync.Once
|
||||
|
||||
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||
var err error
|
||||
|
||||
key, err := s.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// once compose and resign forwarding request
|
||||
onceResign.Do(func() {
|
||||
// compose meta header of the local server
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
// TODO: #1165 think how to set the other fields
|
||||
metaHdr.SetOrigin(meta)
|
||||
writeCurrentVersion(metaHdr)
|
||||
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
err = signature.SignServiceMessage(key, req)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// code below is copy-pasted from c.GetObjectHeader implementation,
|
||||
// perhaps it is worth highlighting the utility function in frostfs-api-go
|
||||
|
||||
// send Head request
|
||||
var headResp *objectV2.HeadResponse
|
||||
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
// verify response key
|
||||
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
hdr *objectV2.Header
|
||||
idSig *refs.Signature
|
||||
)
|
||||
|
||||
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
||||
case *objectV2.ShortHeader:
|
||||
if !body.GetMainOnly() {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
h := v
|
||||
|
||||
hdr = new(objectV2.Header)
|
||||
hdr.SetPayloadLength(h.GetPayloadLength())
|
||||
hdr.SetVersion(h.GetVersion())
|
||||
hdr.SetOwnerID(h.GetOwnerID())
|
||||
hdr.SetObjectType(h.GetObjectType())
|
||||
hdr.SetCreationEpoch(h.GetCreationEpoch())
|
||||
hdr.SetPayloadHash(h.GetPayloadHash())
|
||||
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
|
||||
case *objectV2.HeaderWithSignature:
|
||||
if body.GetMainOnly() {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||
)
|
||||
}
|
||||
|
||||
hdrWithSig := v
|
||||
if hdrWithSig == nil {
|
||||
return nil, errors.New("nil object part")
|
||||
}
|
||||
|
||||
hdr = hdrWithSig.GetHeader()
|
||||
idSig = hdrWithSig.GetSignature()
|
||||
|
||||
if idSig == nil {
|
||||
// TODO(@cthulhu-rider): #1387 use "const" error
|
||||
return nil, errors.New("missing signature")
|
||||
}
|
||||
|
||||
binID, err := objAddr.Object().Marshal()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal ID: %w", err)
|
||||
}
|
||||
|
||||
var sig frostfscrypto.Signature
|
||||
if err := sig.ReadFromV2(*idSig); err != nil {
|
||||
return nil, fmt.Errorf("can't read signature: %w", err)
|
||||
}
|
||||
|
||||
if !sig.Verify(binID) {
|
||||
return nil, errors.New("invalid object ID signature")
|
||||
}
|
||||
case *objectV2.SplitInfo:
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
|
||||
objv2 := new(objectV2.Object)
|
||||
objv2.SetHeader(hdr)
|
||||
objv2.SetSignature(idSig)
|
||||
|
||||
obj := object.NewFromV2(objv2)
|
||||
obj.SetID(objAddr.Object())
|
||||
|
||||
// convert the object
|
||||
return obj, nil
|
||||
}))
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ type replicationTask struct {
|
|||
|
||||
type applyOp struct {
|
||||
treeID string
|
||||
pilorama.CIDDescriptor
|
||||
cid cidSDK.ID
|
||||
pilorama.Move
|
||||
}
|
||||
|
||||
|
@ -43,7 +43,7 @@ func (s *Service) localReplicationWorker() {
|
|||
case <-s.closeCh:
|
||||
return
|
||||
case op := <-s.replicateLocalCh:
|
||||
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
|
||||
err := s.forest.TreeApply(op.cid, op.treeID, &op.Move, false)
|
||||
if err != nil {
|
||||
s.log.Error("failed to apply replicated operation",
|
||||
zap.String("err", err.Error()))
|
||||
|
@ -52,7 +52,7 @@ func (s *Service) localReplicationWorker() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) replicationWorker() {
|
||||
func (s *Service) replicationWorker(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
|
@ -64,13 +64,13 @@ func (s *Service) replicationWorker() {
|
|||
task.n.IterateNetworkEndpoints(func(addr string) bool {
|
||||
lastAddr = addr
|
||||
|
||||
c, err := s.cache.get(context.Background(), addr)
|
||||
c, err := s.cache.get(ctx, addr)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("can't create client: %w", err)
|
||||
return false
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.replicatorTimeout)
|
||||
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
|
||||
_, lastErr = c.Apply(ctx, task.req)
|
||||
cancel()
|
||||
|
||||
|
@ -94,8 +94,7 @@ func (s *Service) replicationWorker() {
|
|||
|
||||
func (s *Service) replicateLoop(ctx context.Context) {
|
||||
for i := 0; i < s.replicatorWorkerCount; i++ {
|
||||
//nolint: contextcheck
|
||||
go s.replicationWorker()
|
||||
go s.replicationWorker(ctx)
|
||||
go s.localReplicationWorker()
|
||||
}
|
||||
defer func() {
|
||||
|
|
|
@ -468,7 +468,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
|||
|
||||
key := req.GetSignature().GetKey()
|
||||
|
||||
_, pos, size, err := s.getContainerInfo(cid, key)
|
||||
_, pos, _, err := s.getContainerInfo(cid, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -485,8 +485,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
|||
|
||||
select {
|
||||
case s.replicateLocalCh <- applyOp{
|
||||
treeID: req.GetBody().GetTreeId(),
|
||||
CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
|
||||
treeID: req.GetBody().GetTreeId(),
|
||||
cid: cid,
|
||||
Move: pilorama.Move{
|
||||
Parent: op.GetParentId(),
|
||||
Child: op.GetChildId(),
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
|
@ -40,11 +41,6 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
|||
return ErrNotInContainer
|
||||
}
|
||||
|
||||
var d pilorama.CIDDescriptor
|
||||
d.CID = cid
|
||||
d.Position = pos
|
||||
d.Size = len(nodes)
|
||||
|
||||
nodes = randomizeNodeOrder(nodes, pos)
|
||||
if len(nodes) == 0 {
|
||||
return nil
|
||||
|
@ -87,18 +83,18 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
|||
}
|
||||
|
||||
for _, tid := range treesToSync {
|
||||
h, err := s.forest.TreeLastSyncHeight(d.CID, tid)
|
||||
h, err := s.forest.TreeLastSyncHeight(cid, tid)
|
||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
s.log.Warn("could not get last synchronized height for a tree",
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", tid))
|
||||
continue
|
||||
}
|
||||
newHeight := s.synchronizeTree(ctx, d, h, tid, nodes)
|
||||
newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
|
||||
if h < newHeight {
|
||||
if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil {
|
||||
if err := s.forest.TreeUpdateLastSyncHeight(cid, tid, newHeight); err != nil {
|
||||
s.log.Warn("could not update last synchronized height for a tree",
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", tid))
|
||||
}
|
||||
}
|
||||
|
@ -118,24 +114,19 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
|
|||
return ErrNotInContainer
|
||||
}
|
||||
|
||||
var d pilorama.CIDDescriptor
|
||||
d.CID = cid
|
||||
d.Position = pos
|
||||
d.Size = len(nodes)
|
||||
|
||||
nodes = randomizeNodeOrder(nodes, pos)
|
||||
if len(nodes) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.synchronizeTree(ctx, d, 0, treeID, nodes)
|
||||
s.synchronizeTree(ctx, cid, 0, treeID, nodes)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, from uint64,
|
||||
func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint64,
|
||||
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
||||
s.log.Debug("synchronize tree",
|
||||
zap.Stringer("cid", d.CID),
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", treeID),
|
||||
zap.Uint64("from", from))
|
||||
|
||||
|
@ -157,7 +148,7 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
|
|||
|
||||
treeClient := NewTreeServiceClient(cc)
|
||||
for {
|
||||
h, err := s.synchronizeSingle(ctx, d, treeID, height, treeClient)
|
||||
h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient)
|
||||
if height < h {
|
||||
height = h
|
||||
}
|
||||
|
@ -179,9 +170,9 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
|
|||
return newHeight
|
||||
}
|
||||
|
||||
func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
||||
func (s *Service) synchronizeSingle(ctx context.Context, cid cidSDK.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
d.CID.Encode(rawCID)
|
||||
cid.Encode(rawCID)
|
||||
|
||||
for {
|
||||
newHeight := height
|
||||
|
@ -211,7 +202,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto
|
|||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
if err := s.forest.TreeApply(d, treeID, m, true); err != nil {
|
||||
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
if m.Time > newHeight {
|
||||
|
@ -255,7 +246,6 @@ func (s *Service) SynchronizeAll() error {
|
|||
}
|
||||
}
|
||||
|
||||
// nolint: funlen, gocognit
|
||||
func (s *Service) syncLoop(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
|
@ -272,86 +262,99 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
|
||||
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
|
||||
cnrsToSync := make([]cid.ID, 0, len(cnrs))
|
||||
newMap, cnrsToSync := s.containersToSync(cnrs)
|
||||
|
||||
var removed []cid.ID
|
||||
for _, cnr := range cnrs {
|
||||
_, pos, err := s.getContainerNodes(cnr)
|
||||
if err != nil {
|
||||
s.log.Error("could not calculate container nodes",
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
s.syncContainers(ctx, cnrsToSync)
|
||||
|
||||
if pos < 0 {
|
||||
// node is not included in the container.
|
||||
continue
|
||||
}
|
||||
|
||||
newMap[cnr] = struct{}{}
|
||||
cnrsToSync = append(cnrsToSync, cnr)
|
||||
}
|
||||
|
||||
// sync new containers
|
||||
var wg sync.WaitGroup
|
||||
for _, cnr := range cnrsToSync {
|
||||
wg.Add(1)
|
||||
cnr := cnr
|
||||
err := s.syncPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
|
||||
|
||||
err := s.synchronizeAllTrees(ctx, cnr)
|
||||
if err != nil {
|
||||
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
|
||||
})
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
s.log.Error("could not query trees for synchronization",
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.Error(err))
|
||||
if errors.Is(err, ants.ErrPoolClosed) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
s.cnrMapMtx.Lock()
|
||||
for cnr := range s.cnrMap {
|
||||
if _, ok := newMap[cnr]; ok {
|
||||
continue
|
||||
}
|
||||
removed = append(removed, cnr)
|
||||
}
|
||||
for i := range removed {
|
||||
delete(s.cnrMap, removed[i])
|
||||
}
|
||||
s.cnrMapMtx.Unlock()
|
||||
|
||||
for _, cnr := range removed {
|
||||
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
|
||||
|
||||
err = s.DropTree(ctx, cnr, "")
|
||||
if err != nil {
|
||||
s.log.Error("could not remove redundant tree",
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
s.removeContainers(ctx, newMap)
|
||||
|
||||
s.log.Debug("trees have been synchronized")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
|
||||
// sync new containers
|
||||
var wg sync.WaitGroup
|
||||
for _, cnr := range cnrs {
|
||||
wg.Add(1)
|
||||
cnr := cnr
|
||||
err := s.syncPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
|
||||
|
||||
err := s.synchronizeAllTrees(ctx, cnr)
|
||||
if err != nil {
|
||||
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
|
||||
})
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
s.log.Error("could not query trees for synchronization",
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.Error(err))
|
||||
if errors.Is(err, ants.ErrPoolClosed) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
||||
s.cnrMapMtx.Lock()
|
||||
defer s.cnrMapMtx.Unlock()
|
||||
|
||||
var removed []cid.ID
|
||||
for cnr := range s.cnrMap {
|
||||
if _, ok := newContainers[cnr]; ok {
|
||||
continue
|
||||
}
|
||||
removed = append(removed, cnr)
|
||||
}
|
||||
for i := range removed {
|
||||
delete(s.cnrMap, removed[i])
|
||||
}
|
||||
|
||||
for _, cnr := range removed {
|
||||
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
|
||||
|
||||
err := s.DropTree(ctx, cnr, "")
|
||||
if err != nil {
|
||||
s.log.Error("could not remove redundant tree",
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
|
||||
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
|
||||
cnrsToSync := make([]cid.ID, 0, len(cnrs))
|
||||
|
||||
for _, cnr := range cnrs {
|
||||
_, pos, err := s.getContainerNodes(cnr)
|
||||
if err != nil {
|
||||
s.log.Error("could not calculate container nodes",
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
if pos < 0 {
|
||||
// node is not included in the container.
|
||||
continue
|
||||
}
|
||||
|
||||
newMap[cnr] = struct{}{}
|
||||
cnrsToSync = append(cnrsToSync, cnr)
|
||||
}
|
||||
return newMap, cnrsToSync
|
||||
}
|
||||
|
||||
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
|
||||
// It is assumed that 0 <= pos < len(nodes).
|
||||
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
|
||||
|
|
Loading…
Add table
Reference in a new issue
looks redundant
continue
orlog
?Fixed.
Fixed.
Fixed.
Fixed.
Fixed.
Fixed.
Fixed.
Fixed.
nice thread, awesome comments