Fix some of the linter exceptions #156
13 changed files with 460 additions and 444 deletions
|
@ -44,7 +44,6 @@ const (
|
||||||
notaryEnabled = true
|
notaryEnabled = true
|
||||||
)
|
)
|
||||||
|
|
||||||
// nolint: funlen, gocognit
|
|
||||||
func dumpBalances(cmd *cobra.Command, _ []string) error {
|
func dumpBalances(cmd *cobra.Command, _ []string) error {
|
||||||
var (
|
var (
|
||||||
dumpStorage, _ = cmd.Flags().GetBool(dumpBalancesStorageFlag)
|
dumpStorage, _ = cmd.Flags().GetBool(dumpBalancesStorageFlag)
|
||||||
|
@ -84,86 +83,110 @@ func dumpBalances(cmd *cobra.Command, _ []string) error {
|
||||||
printBalances(cmd, "Inner ring nodes balances:", irList)
|
printBalances(cmd, "Inner ring nodes balances:", irList)
|
||||||
|
|
||||||
if dumpStorage {
|
if dumpStorage {
|
||||||
arr, err := unwrap.Array(inv.Call(nmHash, "netmap"))
|
if err := printStorageNodeBalances(cmd, inv, nmHash); err != nil {
|
||||||
if err != nil {
|
|
||||||
return errors.New("can't fetch the list of storage nodes")
|
|
||||||
}
|
|
||||||
|
|
||||||
snList := make([]accBalancePair, len(arr))
|
|
||||||
for i := range arr {
|
|
||||||
node, ok := arr[i].Value().([]stackitem.Item)
|
|
||||||
if !ok || len(node) == 0 {
|
|
||||||
return errors.New("can't parse the list of storage nodes")
|
|
||||||
}
|
|
||||||
bs, err := node[0].TryBytes()
|
|
||||||
if err != nil {
|
|
||||||
return errors.New("can't parse the list of storage nodes")
|
|
||||||
}
|
|
||||||
var ni netmap.NodeInfo
|
|
||||||
if err := ni.Unmarshal(bs); err != nil {
|
|
||||||
return fmt.Errorf("can't parse the list of storage nodes: %w", err)
|
|
||||||
}
|
|
||||||
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't parse storage node public key: %w", err)
|
|
||||||
}
|
|
||||||
snList[i].scriptHash = pub.GetScriptHash()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := fetchBalances(inv, gas.Hash, snList); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
printBalances(cmd, "\nStorage node balances:", snList)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if dumpProxy {
|
if dumpProxy {
|
||||||
h, err := nnsResolveHash(inv, nnsCs.Hash, proxyContract+".frostfs")
|
if err := printProxyContractBalance(cmd, inv, nnsCs.Hash); err != nil {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't get hash of the proxy contract: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
proxyList := []accBalancePair{{scriptHash: h}}
|
|
||||||
if err := fetchBalances(inv, gas.Hash, proxyList); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
printBalances(cmd, "\nProxy contract balance:", proxyList)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if dumpAlphabet {
|
if dumpAlphabet {
|
||||||
alphaList := make([]accBalancePair, len(irList))
|
if err := printAlphabetContractBalances(cmd, c, inv, len(irList), nnsCs.Hash); err != nil {
|
||||||
|
|
||||||
w := io.NewBufBinWriter()
|
|
||||||
for i := range alphaList {
|
|
||||||
emit.AppCall(w.BinWriter, nnsCs.Hash, "resolve", callflag.ReadOnly,
|
|
||||||
getAlphabetNNSDomain(i),
|
|
||||||
int64(nns.TXT))
|
|
||||||
}
|
|
||||||
if w.Err != nil {
|
|
||||||
panic(w.Err)
|
|
||||||
}
|
|
||||||
|
|
||||||
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't fetch info from NNS: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range alphaList {
|
|
||||||
h, err := parseNNSResolveResult(alphaRes.Stack[i])
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err)
|
|
||||||
}
|
|
||||||
alphaList[i].scriptHash = h
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := fetchBalances(inv, gas.Hash, alphaList); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
printBalances(cmd, "\nAlphabet contracts balances:", alphaList)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func printStorageNodeBalances(cmd *cobra.Command, inv *invoker.Invoker, nmHash util.Uint160) error {
|
||||||
|
arr, err := unwrap.Array(inv.Call(nmHash, "netmap"))
|
||||||
|
if err != nil {
|
||||||
|
return errors.New("can't fetch the list of storage nodes")
|
||||||
|
}
|
||||||
|
|
||||||
|
snList := make([]accBalancePair, len(arr))
|
||||||
|
for i := range arr {
|
||||||
|
node, ok := arr[i].Value().([]stackitem.Item)
|
||||||
|
if !ok || len(node) == 0 {
|
||||||
|
return errors.New("can't parse the list of storage nodes")
|
||||||
|
}
|
||||||
|
bs, err := node[0].TryBytes()
|
||||||
|
if err != nil {
|
||||||
|
return errors.New("can't parse the list of storage nodes")
|
||||||
|
}
|
||||||
|
var ni netmap.NodeInfo
|
||||||
|
if err := ni.Unmarshal(bs); err != nil {
|
||||||
|
return fmt.Errorf("can't parse the list of storage nodes: %w", err)
|
||||||
|
}
|
||||||
|
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't parse storage node public key: %w", err)
|
||||||
|
}
|
||||||
|
snList[i].scriptHash = pub.GetScriptHash()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fetchBalances(inv, gas.Hash, snList); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
printBalances(cmd, "\nStorage node balances:", snList)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func printProxyContractBalance(cmd *cobra.Command, inv *invoker.Invoker, nnsHash util.Uint160) error {
|
||||||
|
h, err := nnsResolveHash(inv, nnsHash, proxyContract+".frostfs")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't get hash of the proxy contract: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
proxyList := []accBalancePair{{scriptHash: h}}
|
||||||
|
if err := fetchBalances(inv, gas.Hash, proxyList); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
printBalances(cmd, "\nProxy contract balance:", proxyList)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func printAlphabetContractBalances(cmd *cobra.Command, c Client, inv *invoker.Invoker, count int, nnsHash util.Uint160) error {
|
||||||
|
alphaList := make([]accBalancePair, count)
|
||||||
|
|
||||||
|
w := io.NewBufBinWriter()
|
||||||
|
for i := range alphaList {
|
||||||
|
emit.AppCall(w.BinWriter, nnsHash, "resolve", callflag.ReadOnly,
|
||||||
|
getAlphabetNNSDomain(i),
|
||||||
|
int64(nns.TXT))
|
||||||
|
}
|
||||||
|
if w.Err != nil {
|
||||||
|
panic(w.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't fetch info from NNS: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range alphaList {
|
||||||
|
h, err := parseNNSResolveResult(alphaRes.Stack[i])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err)
|
||||||
|
}
|
||||||
|
alphaList[i].scriptHash = h
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fetchBalances(inv, gas.Hash, alphaList); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
printBalances(cmd, "\nAlphabet contracts balances:", alphaList)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func fetchIRNodes(c Client, nmHash, desigHash util.Uint160) ([]accBalancePair, error) {
|
func fetchIRNodes(c Client, nmHash, desigHash util.Uint160) ([]accBalancePair, error) {
|
||||||
var irList []accBalancePair
|
var irList []accBalancePair
|
||||||
|
|
||||||
|
|
|
@ -51,17 +51,17 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeApply implements the pilorama.Forest interface.
|
// TreeApply implements the pilorama.Forest interface.
|
||||||
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
func (e *StorageEngine) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
||||||
index, lst, err := e.getTreeShard(d.CID, treeID)
|
index, lst, err := e.getTreeShard(cnr, treeID)
|
||||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = lst[index].TreeApply(d, treeID, m, backgroundSync)
|
err = lst[index].TreeApply(cnr, treeID, m, backgroundSync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||||
e.reportShardError(lst[index], "can't perform `TreeApply`", err,
|
e.reportShardError(lst[index], "can't perform `TreeApply`", err,
|
||||||
zap.Stringer("cid", d.CID),
|
zap.Stringer("cid", cnr),
|
||||||
zap.String("tree", treeID))
|
zap.String("tree", treeID))
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -138,57 +138,57 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
||||||
// Operation is very resource-intensive, which is caused by the admissibility
|
// Operation is very resource-intensive, which is caused by the admissibility
|
||||||
// of multiple locks. Also, if we knew what objects are locked, it would be
|
// of multiple locks. Also, if we knew what objects are locked, it would be
|
||||||
// possible to speed up the execution.
|
// possible to speed up the execution.
|
||||||
//
|
|
||||||
// nolint: gocognit
|
|
||||||
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
|
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
|
||||||
bucketLocked := tx.Bucket(bucketNameLocked)
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||||
if bucketLocked != nil {
|
if bucketLocked == nil {
|
||||||
key := make([]byte, cidSize)
|
return nil
|
||||||
idCnr.Encode(key)
|
}
|
||||||
|
|
||||||
bucketLockedContainer := bucketLocked.Bucket(key)
|
key := make([]byte, cidSize)
|
||||||
if bucketLockedContainer != nil {
|
idCnr.Encode(key)
|
||||||
keyLocker := objectKey(locker, key)
|
|
||||||
return bucketLockedContainer.ForEach(func(k, v []byte) error {
|
|
||||||
keyLockers, err := decodeList(v)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range keyLockers {
|
bucketLockedContainer := bucketLocked.Bucket(key)
|
||||||
if bytes.Equal(keyLockers[i], keyLocker) {
|
if bucketLockedContainer == nil {
|
||||||
if len(keyLockers) == 1 {
|
return nil
|
||||||
// locker was all alone
|
}
|
||||||
err = bucketLockedContainer.Delete(k)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// exclude locker
|
|
||||||
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
|
||||||
|
|
||||||
v, err = encodeList(keyLockers)
|
keyLocker := objectKey(locker, key)
|
||||||
if err != nil {
|
return bucketLockedContainer.ForEach(func(k, v []byte) error {
|
||||||
return fmt.Errorf("encode updated list of lockers: %w", err)
|
keyLockers, err := decodeList(v)
|
||||||
}
|
if err != nil {
|
||||||
|
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// update the record
|
for i := range keyLockers {
|
||||||
err = bucketLockedContainer.Put(k, v)
|
if bytes.Equal(keyLockers[i], keyLocker) {
|
||||||
if err != nil {
|
if len(keyLockers) == 1 {
|
||||||
return fmt.Errorf("update list of lockers: %w", err)
|
// locker was all alone
|
||||||
}
|
err = bucketLockedContainer.Delete(k)
|
||||||
}
|
if err != nil {
|
||||||
|
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// exclude locker
|
||||||
|
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
|
||||||
|
|
||||||
return nil
|
v, err = encodeList(keyLockers)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("encode updated list of lockers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// update the record
|
||||||
|
err = bucketLockedContainer.Put(k, v)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("update list of lockers: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsLockedPrm groups the parameters of IsLocked operation.
|
// IsLockedPrm groups the parameters of IsLocked operation.
|
||||||
|
|
|
@ -327,11 +327,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeApply implements the Forest interface.
|
// TreeApply implements the Forest interface.
|
||||||
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error {
|
func (t *boltForest) TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error {
|
||||||
if !d.checkValid() {
|
|
||||||
return ErrInvalidCIDDescriptor
|
|
||||||
}
|
|
||||||
|
|
||||||
t.modeMtx.RLock()
|
t.modeMtx.RLock()
|
||||||
defer t.modeMtx.RUnlock()
|
defer t.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
@ -344,7 +340,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
||||||
if backgroundSync {
|
if backgroundSync {
|
||||||
var seen bool
|
var seen bool
|
||||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||||
treeRoot := tx.Bucket(bucketName(d.CID, treeID))
|
treeRoot := tx.Bucket(bucketName(cnr, treeID))
|
||||||
if treeRoot == nil {
|
if treeRoot == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -362,7 +358,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.db.MaxBatchSize == 1 {
|
if t.db.MaxBatchSize == 1 {
|
||||||
fullID := bucketName(d.CID, treeID)
|
fullID := bucketName(cnr, treeID)
|
||||||
return t.db.Update(func(tx *bbolt.Tx) error {
|
return t.db.Update(func(tx *bbolt.Tx) error {
|
||||||
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
|
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -375,11 +371,11 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan error, 1)
|
ch := make(chan error, 1)
|
||||||
t.addBatch(d, treeID, m, ch)
|
t.addBatch(cnr, treeID, m, ch)
|
||||||
return <-ch
|
return <-ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
|
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
|
||||||
t.mtx.Lock()
|
t.mtx.Lock()
|
||||||
for i := 0; i < len(t.batches); i++ {
|
for i := 0; i < len(t.batches); i++ {
|
||||||
t.batches[i].mtx.Lock()
|
t.batches[i].mtx.Lock()
|
||||||
|
@ -391,7 +387,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
|
found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID
|
||||||
if found {
|
if found {
|
||||||
t.batches[i].results = append(t.batches[i].results, ch)
|
t.batches[i].results = append(t.batches[i].results, ch)
|
||||||
t.batches[i].operations = append(t.batches[i].operations, m)
|
t.batches[i].operations = append(t.batches[i].operations, m)
|
||||||
|
@ -412,7 +408,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
|
||||||
}
|
}
|
||||||
b := &batch{
|
b := &batch{
|
||||||
forest: t,
|
forest: t,
|
||||||
cid: d.CID,
|
cid: cnr,
|
||||||
treeID: treeID,
|
treeID: treeID,
|
||||||
results: []chan<- error{ch},
|
results: []chan<- error{ch},
|
||||||
operations: []*Move{m},
|
operations: []*Move{m},
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -93,12 +94,8 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeApply implements the Forest interface.
|
// TreeApply implements the Forest interface.
|
||||||
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move, _ bool) error {
|
func (f *memoryForest) TreeApply(cnr cid.ID, treeID string, op *Move, _ bool) error {
|
||||||
if !d.checkValid() {
|
fullID := cnr.String() + "/" + treeID
|
||||||
return ErrInvalidCIDDescriptor
|
|
||||||
}
|
|
||||||
|
|
||||||
fullID := d.CID.String() + "/" + treeID
|
|
||||||
s, ok := f.treeMap[fullID]
|
s, ok := f.treeMap[fullID]
|
||||||
if !ok {
|
if !ok {
|
||||||
s = newState()
|
s = newState()
|
||||||
|
|
|
@ -411,21 +411,10 @@ func TestForest_Apply(t *testing.T) {
|
||||||
|
|
||||||
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
||||||
t.Run("invalid descriptor", func(t *testing.T) {
|
|
||||||
s := constructor(t)
|
|
||||||
err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{
|
|
||||||
Child: 10,
|
|
||||||
Parent: 0,
|
|
||||||
Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}},
|
|
||||||
}, false)
|
|
||||||
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
|
|
||||||
})
|
|
||||||
|
|
||||||
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
|
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &Move{
|
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||||
Child: child,
|
Child: child,
|
||||||
Parent: parent,
|
Parent: parent,
|
||||||
Meta: meta,
|
Meta: meta,
|
||||||
|
@ -465,7 +454,6 @@ func TestForest_GetOpLog(t *testing.T) {
|
||||||
|
|
||||||
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
logs := []Move{
|
logs := []Move{
|
||||||
{
|
{
|
||||||
|
@ -491,7 +479,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Op
|
||||||
})
|
})
|
||||||
|
|
||||||
for i := range logs {
|
for i := range logs {
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &logs[i], false))
|
require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
testGetOpLog := func(t *testing.T, height uint64, m Move) {
|
testGetOpLog := func(t *testing.T, height uint64, m Move) {
|
||||||
|
@ -533,13 +521,12 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
|
||||||
|
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
|
|
||||||
t.Run("empty state, no panic", func(t *testing.T) {
|
t.Run("empty state, no panic", func(t *testing.T) {
|
||||||
checkExists(t, false, cid, treeID)
|
checkExists(t, false, cid, treeID)
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}, false))
|
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
|
||||||
checkExists(t, true, cid, treeID)
|
checkExists(t, true, cid, treeID)
|
||||||
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
||||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||||
|
@ -570,16 +557,16 @@ func TestApplyTricky1(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1}
|
cid := cidtest.ID()
|
||||||
for i := range providers {
|
for i := range providers {
|
||||||
t.Run(providers[i].name, func(t *testing.T) {
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
s := providers[i].construct(t)
|
s := providers[i].construct(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range expected {
|
for i := range expected {
|
||||||
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child)
|
_, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expected[i].parent, parent)
|
require.Equal(t, expected[i].parent, parent)
|
||||||
}
|
}
|
||||||
|
@ -631,16 +618,16 @@ func TestApplyTricky2(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1}
|
cid := cidtest.ID()
|
||||||
for i := range providers {
|
for i := range providers {
|
||||||
t.Run(providers[i].name, func(t *testing.T) {
|
t.Run(providers[i].name, func(t *testing.T) {
|
||||||
s := providers[i].construct(t)
|
s := providers[i].construct(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range expected {
|
for i := range expected {
|
||||||
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child)
|
_, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expected[i].parent, parent)
|
require.Equal(t, expected[i].parent, parent)
|
||||||
}
|
}
|
||||||
|
@ -746,12 +733,11 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
|
||||||
ops := prepareRandomTree(nodeCount, opCount)
|
ops := prepareRandomTree(nodeCount, opCount)
|
||||||
|
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
||||||
expected := constructor(t)
|
expected := constructor(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < iterCount; i++ {
|
for i := 0; i < iterCount; i++ {
|
||||||
|
@ -766,7 +752,7 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for op := range ch {
|
for op := range ch {
|
||||||
require.NoError(t, actual.TreeApply(d, treeID, op, false))
|
require.NoError(t, actual.TreeApply(cid, treeID, op, false))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -792,12 +778,11 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
|
||||||
ops := prepareRandomTree(nodeCount, opCount)
|
ops := prepareRandomTree(nodeCount, opCount)
|
||||||
|
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
||||||
expected := constructor(t)
|
expected := constructor(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
|
|
||||||
const iterCount = 200
|
const iterCount = 200
|
||||||
|
@ -807,7 +792,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
|
||||||
|
|
||||||
actual := constructor(t)
|
actual := constructor(t)
|
||||||
for i := range ops {
|
for i := range ops {
|
||||||
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
|
require.NoError(t, actual.TreeApply(cid, treeID, &ops[i], false))
|
||||||
}
|
}
|
||||||
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
||||||
}
|
}
|
||||||
|
@ -889,7 +874,6 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
||||||
|
|
||||||
ops := genFunc(b.N)
|
ops := genFunc(b.N)
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
ch := make(chan int, b.N)
|
ch := make(chan int, b.N)
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
@ -901,7 +885,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
||||||
b.SetParallelism(10)
|
b.SetParallelism(10)
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
|
if err := s.TreeApply(cid, treeID, &ops[<-ch], false); err != nil {
|
||||||
b.Fatalf("error in `Apply`: %v", err)
|
b.Fatalf("error in `Apply`: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -918,7 +902,6 @@ func TestTreeGetByPath(t *testing.T) {
|
||||||
|
|
||||||
func testTreeGetByPath(t *testing.T, s Forest) {
|
func testTreeGetByPath(t *testing.T, s Forest) {
|
||||||
cid := cidtest.ID()
|
cid := cidtest.ID()
|
||||||
d := CIDDescriptor{cid, 0, 1}
|
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
||||||
// /
|
// /
|
||||||
|
@ -928,12 +911,12 @@ func testTreeGetByPath(t *testing.T, s Forest) {
|
||||||
// |- cat1.jpg, Version=XXX (4)
|
// |- cat1.jpg, Version=XXX (4)
|
||||||
// |- cat1.jpg, Version=YYY (5)
|
// |- cat1.jpg, Version=YYY (5)
|
||||||
// |- cat2.jpg, Version=ZZZ (6)
|
// |- cat2.jpg, Version=ZZZ (6)
|
||||||
testMove(t, s, 0, 1, 0, d, treeID, "a", "")
|
testMove(t, s, 0, 1, 0, cid, treeID, "a", "")
|
||||||
testMove(t, s, 1, 2, 0, d, treeID, "b", "")
|
testMove(t, s, 1, 2, 0, cid, treeID, "b", "")
|
||||||
testMove(t, s, 2, 3, 1, d, treeID, "cat1.jpg", "TTT")
|
testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT")
|
||||||
testMove(t, s, 3, 4, 2, d, treeID, "cat1.jpg", "XXX")
|
testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX")
|
||||||
testMove(t, s, 4, 5, 2, d, treeID, "cat1.jpg", "YYY")
|
testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY")
|
||||||
testMove(t, s, 5, 6, 2, d, treeID, "cat2.jpg", "ZZZ")
|
testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ")
|
||||||
|
|
||||||
if mf, ok := s.(*memoryForest); ok {
|
if mf, ok := s.(*memoryForest); ok {
|
||||||
single := mf.treeMap[cid.String()+"/"+treeID]
|
single := mf.treeMap[cid.String()+"/"+treeID]
|
||||||
|
@ -970,14 +953,14 @@ func testTreeGetByPath(t *testing.T, s Forest) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor, treeID, filename, version string) {
|
func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) {
|
||||||
items := make([]KeyValue, 1, 2)
|
items := make([]KeyValue, 1, 2)
|
||||||
items[0] = KeyValue{AttributeFilename, []byte(filename)}
|
items[0] = KeyValue{AttributeFilename, []byte(filename)}
|
||||||
if version != "" {
|
if version != "" {
|
||||||
items = append(items, KeyValue{AttributeVersion, []byte(version)})
|
items = append(items, KeyValue{AttributeVersion, []byte(version)})
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, s.TreeApply(d, treeID, &Move{
|
require.NoError(t, s.TreeApply(cid, treeID, &Move{
|
||||||
Parent: parent,
|
Parent: parent,
|
||||||
Child: node,
|
Child: node,
|
||||||
Meta: Meta{
|
Meta: Meta{
|
||||||
|
|
|
@ -18,7 +18,7 @@ type Forest interface {
|
||||||
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
|
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
|
||||||
// TreeApply applies replicated operation from another node.
|
// TreeApply applies replicated operation from another node.
|
||||||
// If background is true, TreeApply will first check whether an operation exists.
|
// If background is true, TreeApply will first check whether an operation exists.
|
||||||
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error
|
TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
||||||
// TreeGetByPath returns all nodes corresponding to the path.
|
// TreeGetByPath returns all nodes corresponding to the path.
|
||||||
// The path is constructed by descending from the root using the values of the
|
// The path is constructed by descending from the root using the values of the
|
||||||
// AttributeFilename in meta.
|
// AttributeFilename in meta.
|
||||||
|
|
|
@ -155,7 +155,6 @@ func (s *Shard) Init() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: funlen
|
|
||||||
func (s *Shard) refillMetabase() error {
|
func (s *Shard) refillMetabase() error {
|
||||||
err := s.metaBase.Reset()
|
err := s.metaBase.Reset()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -172,57 +171,23 @@ func (s *Shard) refillMetabase() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: exhaustive
|
var err error
|
||||||
switch obj.Type() {
|
switch obj.Type() {
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
tombstone := objectSDK.NewTombstone()
|
err = s.refillTombstoneObject(obj)
|
||||||
|
|
||||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
|
||||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tombAddr := object.AddressOf(obj)
|
|
||||||
memberIDs := tombstone.Members()
|
|
||||||
tombMembers := make([]oid.Address, 0, len(memberIDs))
|
|
||||||
|
|
||||||
for i := range memberIDs {
|
|
||||||
a := tombAddr
|
|
||||||
a.SetObject(memberIDs[i])
|
|
||||||
|
|
||||||
tombMembers = append(tombMembers, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
|
||||||
|
|
||||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
|
||||||
inhumePrm.SetAddresses(tombMembers...)
|
|
||||||
|
|
||||||
_, err = s.metaBase.Inhume(inhumePrm)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not inhume objects: %w", err)
|
|
||||||
}
|
|
||||||
case objectSDK.TypeLock:
|
case objectSDK.TypeLock:
|
||||||
var lock objectSDK.Lock
|
err = s.refillLockObject(obj)
|
||||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
default:
|
||||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
}
|
||||||
}
|
if err != nil {
|
||||||
|
return err
|
||||||
locked := make([]oid.ID, lock.NumberOfMembers())
|
|
||||||
lock.ReadMembers(locked)
|
|
||||||
|
|
||||||
cnr, _ := obj.ContainerID()
|
|
||||||
id, _ := obj.ID()
|
|
||||||
err = s.metaBase.Lock(cnr, id, locked)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not lock objects: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var mPrm meta.PutPrm
|
var mPrm meta.PutPrm
|
||||||
mPrm.SetObject(obj)
|
mPrm.SetObject(obj)
|
||||||
mPrm.SetStorageID(descriptor)
|
mPrm.SetStorageID(descriptor)
|
||||||
|
|
||||||
_, err := s.metaBase.Put(mPrm)
|
_, err = s.metaBase.Put(mPrm)
|
||||||
if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -241,6 +206,54 @@ func (s *Shard) refillMetabase() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) refillLockObject(obj *objectSDK.Object) error {
|
||||||
|
var lock objectSDK.Lock
|
||||||
|
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||||
|
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
locked := make([]oid.ID, lock.NumberOfMembers())
|
||||||
|
lock.ReadMembers(locked)
|
||||||
|
|
||||||
|
cnr, _ := obj.ContainerID()
|
||||||
|
id, _ := obj.ID()
|
||||||
|
err := s.metaBase.Lock(cnr, id, locked)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not lock objects: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) refillTombstoneObject(obj *objectSDK.Object) error {
|
||||||
|
tombstone := objectSDK.NewTombstone()
|
||||||
|
|
||||||
|
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||||
|
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tombAddr := object.AddressOf(obj)
|
||||||
|
memberIDs := tombstone.Members()
|
||||||
|
tombMembers := make([]oid.Address, 0, len(memberIDs))
|
||||||
|
|
||||||
|
for i := range memberIDs {
|
||||||
|
a := tombAddr
|
||||||
|
a.SetObject(memberIDs[i])
|
||||||
|
|
||||||
|
tombMembers = append(tombMembers, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
var inhumePrm meta.InhumePrm
|
||||||
|
|
||||||
|
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||||
|
inhumePrm.SetAddresses(tombMembers...)
|
||||||
|
|
||||||
|
_, err := s.metaBase.Inhume(inhumePrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not inhume objects: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close releases all Shard's components.
|
// Close releases all Shard's components.
|
||||||
func (s *Shard) Close() error {
|
func (s *Shard) Close() error {
|
||||||
components := []interface{ Close() error }{}
|
components := []interface{ Close() error }{}
|
||||||
|
|
|
@ -42,7 +42,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeApply implements the pilorama.Forest interface.
|
// TreeApply implements the pilorama.Forest interface.
|
||||||
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
func (s *Shard) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
|
||||||
if s.pilorama == nil {
|
if s.pilorama == nil {
|
||||||
return ErrPiloramaDisabled
|
return ErrPiloramaDisabled
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
|
||||||
if s.info.Mode.ReadOnly() {
|
if s.info.Mode.ReadOnly() {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
return s.pilorama.TreeApply(d, treeID, m, backgroundSync)
|
return s.pilorama.TreeApply(cnr, treeID, m, backgroundSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeGetByPath implements the pilorama.Forest interface.
|
// TreeGetByPath implements the pilorama.Forest interface.
|
||||||
|
|
|
@ -426,7 +426,7 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: funlen, gocognit
|
// nolint: funlen
|
||||||
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
|
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
|
||||||
body := req.GetBody()
|
body := req.GetBody()
|
||||||
|
|
||||||
|
@ -459,138 +459,140 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
|
||||||
body: resp.GetBody(),
|
body: resp.GetBody(),
|
||||||
})
|
})
|
||||||
|
|
||||||
if !commonPrm.LocalOnly() {
|
if commonPrm.LocalOnly() {
|
||||||
var onceResign sync.Once
|
return p, nil
|
||||||
|
|
||||||
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
key, err := s.keyStorage.GetKey(nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// once compose and resign forwarding request
|
|
||||||
onceResign.Do(func() {
|
|
||||||
// compose meta header of the local server
|
|
||||||
metaHdr := new(session.RequestMetaHeader)
|
|
||||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
|
||||||
// TODO: #1165 think how to set the other fields
|
|
||||||
metaHdr.SetOrigin(meta)
|
|
||||||
writeCurrentVersion(metaHdr)
|
|
||||||
|
|
||||||
req.SetMetaHeader(metaHdr)
|
|
||||||
|
|
||||||
err = signature.SignServiceMessage(key, req)
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// code below is copy-pasted from c.GetObjectHeader implementation,
|
|
||||||
// perhaps it is worth highlighting the utility function in frostfs-api-go
|
|
||||||
|
|
||||||
// send Head request
|
|
||||||
var headResp *objectV2.HeadResponse
|
|
||||||
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
|
||||||
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify response key
|
|
||||||
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify response structure
|
|
||||||
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
|
||||||
return nil, fmt.Errorf("response verification failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
hdr *objectV2.Header
|
|
||||||
idSig *refs.Signature
|
|
||||||
)
|
|
||||||
|
|
||||||
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
|
||||||
case nil:
|
|
||||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
|
||||||
case *objectV2.ShortHeader:
|
|
||||||
if !body.GetMainOnly() {
|
|
||||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
|
||||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
h := v
|
|
||||||
|
|
||||||
hdr = new(objectV2.Header)
|
|
||||||
hdr.SetPayloadLength(h.GetPayloadLength())
|
|
||||||
hdr.SetVersion(h.GetVersion())
|
|
||||||
hdr.SetOwnerID(h.GetOwnerID())
|
|
||||||
hdr.SetObjectType(h.GetObjectType())
|
|
||||||
hdr.SetCreationEpoch(h.GetCreationEpoch())
|
|
||||||
hdr.SetPayloadHash(h.GetPayloadHash())
|
|
||||||
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
|
|
||||||
case *objectV2.HeaderWithSignature:
|
|
||||||
if body.GetMainOnly() {
|
|
||||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
|
||||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
hdrWithSig := v
|
|
||||||
if hdrWithSig == nil {
|
|
||||||
return nil, errors.New("nil object part")
|
|
||||||
}
|
|
||||||
|
|
||||||
hdr = hdrWithSig.GetHeader()
|
|
||||||
idSig = hdrWithSig.GetSignature()
|
|
||||||
|
|
||||||
if idSig == nil {
|
|
||||||
// TODO(@cthulhu-rider): #1387 use "const" error
|
|
||||||
return nil, errors.New("missing signature")
|
|
||||||
}
|
|
||||||
|
|
||||||
binID, err := objAddr.Object().Marshal()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshal ID: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var sig frostfscrypto.Signature
|
|
||||||
if err := sig.ReadFromV2(*idSig); err != nil {
|
|
||||||
return nil, fmt.Errorf("can't read signature: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !sig.Verify(binID) {
|
|
||||||
return nil, errors.New("invalid object ID signature")
|
|
||||||
}
|
|
||||||
case *objectV2.SplitInfo:
|
|
||||||
si := object.NewSplitInfoFromV2(v)
|
|
||||||
|
|
||||||
return nil, object.NewSplitInfoError(si)
|
|
||||||
}
|
|
||||||
|
|
||||||
objv2 := new(objectV2.Object)
|
|
||||||
objv2.SetHeader(hdr)
|
|
||||||
objv2.SetSignature(idSig)
|
|
||||||
|
|
||||||
obj := object.NewFromV2(objv2)
|
|
||||||
obj.SetID(objAddr.Object())
|
|
||||||
|
|
||||||
// convert the object
|
|
||||||
return obj, nil
|
|
||||||
}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var onceResign sync.Once
|
||||||
|
|
||||||
|
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
key, err := s.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// once compose and resign forwarding request
|
||||||
|
onceResign.Do(func() {
|
||||||
|
// compose meta header of the local server
|
||||||
|
metaHdr := new(session.RequestMetaHeader)
|
||||||
|
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||||
|
// TODO: #1165 think how to set the other fields
|
||||||
|
metaHdr.SetOrigin(meta)
|
||||||
|
writeCurrentVersion(metaHdr)
|
||||||
|
|
||||||
|
req.SetMetaHeader(metaHdr)
|
||||||
|
|
||||||
|
err = signature.SignServiceMessage(key, req)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// code below is copy-pasted from c.GetObjectHeader implementation,
|
||||||
|
// perhaps it is worth highlighting the utility function in frostfs-api-go
|
||||||
|
|
||||||
|
// send Head request
|
||||||
|
var headResp *objectV2.HeadResponse
|
||||||
|
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
|
||||||
|
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify response key
|
||||||
|
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify response structure
|
||||||
|
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
||||||
|
return nil, fmt.Errorf("response verification failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
hdr *objectV2.Header
|
||||||
|
idSig *refs.Signature
|
||||||
|
)
|
||||||
|
|
||||||
|
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
||||||
|
case nil:
|
||||||
|
return nil, fmt.Errorf("unexpected header type %T", v)
|
||||||
|
case *objectV2.ShortHeader:
|
||||||
|
if !body.GetMainOnly() {
|
||||||
|
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||||
|
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
h := v
|
||||||
|
|
||||||
|
hdr = new(objectV2.Header)
|
||||||
|
hdr.SetPayloadLength(h.GetPayloadLength())
|
||||||
|
hdr.SetVersion(h.GetVersion())
|
||||||
|
hdr.SetOwnerID(h.GetOwnerID())
|
||||||
|
hdr.SetObjectType(h.GetObjectType())
|
||||||
|
hdr.SetCreationEpoch(h.GetCreationEpoch())
|
||||||
|
hdr.SetPayloadHash(h.GetPayloadHash())
|
||||||
|
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
|
||||||
|
case *objectV2.HeaderWithSignature:
|
||||||
|
if body.GetMainOnly() {
|
||||||
|
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||||
|
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
hdrWithSig := v
|
||||||
|
if hdrWithSig == nil {
|
||||||
|
return nil, errors.New("nil object part")
|
||||||
|
}
|
||||||
|
|
||||||
|
hdr = hdrWithSig.GetHeader()
|
||||||
|
idSig = hdrWithSig.GetSignature()
|
||||||
|
|
||||||
|
if idSig == nil {
|
||||||
|
// TODO(@cthulhu-rider): #1387 use "const" error
|
||||||
|
return nil, errors.New("missing signature")
|
||||||
|
}
|
||||||
|
|
||||||
|
binID, err := objAddr.Object().Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var sig frostfscrypto.Signature
|
||||||
|
if err := sig.ReadFromV2(*idSig); err != nil {
|
||||||
|
return nil, fmt.Errorf("can't read signature: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !sig.Verify(binID) {
|
||||||
|
return nil, errors.New("invalid object ID signature")
|
||||||
|
}
|
||||||
|
case *objectV2.SplitInfo:
|
||||||
|
si := object.NewSplitInfoFromV2(v)
|
||||||
|
|
||||||
|
return nil, object.NewSplitInfoError(si)
|
||||||
|
}
|
||||||
|
|
||||||
|
objv2 := new(objectV2.Object)
|
||||||
|
objv2.SetHeader(hdr)
|
||||||
|
objv2.SetSignature(idSig)
|
||||||
|
|
||||||
|
obj := object.NewFromV2(objv2)
|
||||||
|
obj.SetID(objAddr.Object())
|
||||||
|
|
||||||
|
// convert the object
|
||||||
|
return obj, nil
|
||||||
|
}))
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ type replicationTask struct {
|
||||||
|
|
||||||
type applyOp struct {
|
type applyOp struct {
|
||||||
treeID string
|
treeID string
|
||||||
pilorama.CIDDescriptor
|
cid cidSDK.ID
|
||||||
pilorama.Move
|
pilorama.Move
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ func (s *Service) localReplicationWorker() {
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
return
|
return
|
||||||
case op := <-s.replicateLocalCh:
|
case op := <-s.replicateLocalCh:
|
||||||
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
|
err := s.forest.TreeApply(op.cid, op.treeID, &op.Move, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error("failed to apply replicated operation",
|
s.log.Error("failed to apply replicated operation",
|
||||||
zap.String("err", err.Error()))
|
zap.String("err", err.Error()))
|
||||||
|
@ -52,7 +52,7 @@ func (s *Service) localReplicationWorker() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) replicationWorker() {
|
func (s *Service) replicationWorker(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
|
@ -64,13 +64,13 @@ func (s *Service) replicationWorker() {
|
||||||
task.n.IterateNetworkEndpoints(func(addr string) bool {
|
task.n.IterateNetworkEndpoints(func(addr string) bool {
|
||||||
lastAddr = addr
|
lastAddr = addr
|
||||||
|
|
||||||
c, err := s.cache.get(context.Background(), addr)
|
c, err := s.cache.get(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lastErr = fmt.Errorf("can't create client: %w", err)
|
lastErr = fmt.Errorf("can't create client: %w", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), s.replicatorTimeout)
|
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
|
||||||
_, lastErr = c.Apply(ctx, task.req)
|
_, lastErr = c.Apply(ctx, task.req)
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
@ -94,8 +94,7 @@ func (s *Service) replicationWorker() {
|
||||||
|
|
||||||
func (s *Service) replicateLoop(ctx context.Context) {
|
func (s *Service) replicateLoop(ctx context.Context) {
|
||||||
for i := 0; i < s.replicatorWorkerCount; i++ {
|
for i := 0; i < s.replicatorWorkerCount; i++ {
|
||||||
//nolint: contextcheck
|
go s.replicationWorker(ctx)
|
||||||
go s.replicationWorker()
|
|
||||||
go s.localReplicationWorker()
|
go s.localReplicationWorker()
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -468,7 +468,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
|
|
||||||
key := req.GetSignature().GetKey()
|
key := req.GetSignature().GetKey()
|
||||||
|
|
||||||
_, pos, size, err := s.getContainerInfo(cid, key)
|
_, pos, _, err := s.getContainerInfo(cid, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -485,8 +485,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case s.replicateLocalCh <- applyOp{
|
case s.replicateLocalCh <- applyOp{
|
||||||
treeID: req.GetBody().GetTreeId(),
|
treeID: req.GetBody().GetTreeId(),
|
||||||
CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
|
cid: cid,
|
||||||
Move: pilorama.Move{
|
Move: pilorama.Move{
|
||||||
Parent: op.GetParentId(),
|
Parent: op.GetParentId(),
|
||||||
Child: op.GetChildId(),
|
Child: op.GetChildId(),
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -40,11 +41,6 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
||||||
return ErrNotInContainer
|
return ErrNotInContainer
|
||||||
}
|
}
|
||||||
|
|
||||||
var d pilorama.CIDDescriptor
|
|
||||||
d.CID = cid
|
|
||||||
d.Position = pos
|
|
||||||
d.Size = len(nodes)
|
|
||||||
|
|
||||||
nodes = randomizeNodeOrder(nodes, pos)
|
nodes = randomizeNodeOrder(nodes, pos)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -87,18 +83,18 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tid := range treesToSync {
|
for _, tid := range treesToSync {
|
||||||
h, err := s.forest.TreeLastSyncHeight(d.CID, tid)
|
h, err := s.forest.TreeLastSyncHeight(cid, tid)
|
||||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
s.log.Warn("could not get last synchronized height for a tree",
|
s.log.Warn("could not get last synchronized height for a tree",
|
||||||
zap.Stringer("cid", d.CID),
|
zap.Stringer("cid", cid),
|
||||||
zap.String("tree", tid))
|
zap.String("tree", tid))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
newHeight := s.synchronizeTree(ctx, d, h, tid, nodes)
|
newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
|
||||||
if h < newHeight {
|
if h < newHeight {
|
||||||
if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil {
|
if err := s.forest.TreeUpdateLastSyncHeight(cid, tid, newHeight); err != nil {
|
||||||
s.log.Warn("could not update last synchronized height for a tree",
|
s.log.Warn("could not update last synchronized height for a tree",
|
||||||
zap.Stringer("cid", d.CID),
|
zap.Stringer("cid", cid),
|
||||||
zap.String("tree", tid))
|
zap.String("tree", tid))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -118,24 +114,19 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
|
||||||
return ErrNotInContainer
|
return ErrNotInContainer
|
||||||
}
|
}
|
||||||
|
|
||||||
var d pilorama.CIDDescriptor
|
|
||||||
d.CID = cid
|
|
||||||
d.Position = pos
|
|
||||||
d.Size = len(nodes)
|
|
||||||
|
|
||||||
nodes = randomizeNodeOrder(nodes, pos)
|
nodes = randomizeNodeOrder(nodes, pos)
|
||||||
if len(nodes) == 0 {
|
if len(nodes) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.synchronizeTree(ctx, d, 0, treeID, nodes)
|
s.synchronizeTree(ctx, cid, 0, treeID, nodes)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, from uint64,
|
func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint64,
|
||||||
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
||||||
s.log.Debug("synchronize tree",
|
s.log.Debug("synchronize tree",
|
||||||
zap.Stringer("cid", d.CID),
|
zap.Stringer("cid", cid),
|
||||||
zap.String("tree", treeID),
|
zap.String("tree", treeID),
|
||||||
zap.Uint64("from", from))
|
zap.Uint64("from", from))
|
||||||
|
|
||||||
|
@ -157,7 +148,7 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
|
||||||
|
|
||||||
treeClient := NewTreeServiceClient(cc)
|
treeClient := NewTreeServiceClient(cc)
|
||||||
for {
|
for {
|
||||||
h, err := s.synchronizeSingle(ctx, d, treeID, height, treeClient)
|
h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient)
|
||||||
if height < h {
|
if height < h {
|
||||||
height = h
|
height = h
|
||||||
}
|
}
|
||||||
|
@ -179,9 +170,9 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
|
||||||
return newHeight
|
return newHeight
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
func (s *Service) synchronizeSingle(ctx context.Context, cid cidSDK.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
||||||
rawCID := make([]byte, sha256.Size)
|
rawCID := make([]byte, sha256.Size)
|
||||||
d.CID.Encode(rawCID)
|
cid.Encode(rawCID)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
newHeight := height
|
newHeight := height
|
||||||
|
@ -211,7 +202,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto
|
||||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
if err := s.forest.TreeApply(d, treeID, m, true); err != nil {
|
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
if m.Time > newHeight {
|
if m.Time > newHeight {
|
||||||
|
@ -255,7 +246,6 @@ func (s *Service) SynchronizeAll() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: funlen, gocognit
|
|
||||||
func (s *Service) syncLoop(ctx context.Context) {
|
func (s *Service) syncLoop(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -272,86 +262,99 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
|
newMap, cnrsToSync := s.containersToSync(cnrs)
|
||||||
cnrsToSync := make([]cid.ID, 0, len(cnrs))
|
|
||||||
|
|
||||||
var removed []cid.ID
|
s.syncContainers(ctx, cnrsToSync)
|
||||||
for _, cnr := range cnrs {
|
|
||||||
_, pos, err := s.getContainerNodes(cnr)
|
|
||||||
if err != nil {
|
|
||||||
s.log.Error("could not calculate container nodes",
|
|
||||||
zap.Stringer("cid", cnr),
|
|
||||||
zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if pos < 0 {
|
s.removeContainers(ctx, newMap)
|
||||||
// node is not included in the container.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
newMap[cnr] = struct{}{}
|
|
||||||
cnrsToSync = append(cnrsToSync, cnr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// sync new containers
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for _, cnr := range cnrsToSync {
|
|
||||||
wg.Add(1)
|
|
||||||
cnr := cnr
|
|
||||||
err := s.syncPool.Submit(func() {
|
|
||||||
defer wg.Done()
|
|
||||||
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
|
|
||||||
|
|
||||||
err := s.synchronizeAllTrees(ctx, cnr)
|
|
||||||
if err != nil {
|
|
||||||
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
wg.Done()
|
|
||||||
s.log.Error("could not query trees for synchronization",
|
|
||||||
zap.Stringer("cid", cnr),
|
|
||||||
zap.Error(err))
|
|
||||||
if errors.Is(err, ants.ErrPoolClosed) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
s.cnrMapMtx.Lock()
|
|
||||||
for cnr := range s.cnrMap {
|
|
||||||
if _, ok := newMap[cnr]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
removed = append(removed, cnr)
|
|
||||||
}
|
|
||||||
for i := range removed {
|
|
||||||
delete(s.cnrMap, removed[i])
|
|
||||||
}
|
|
||||||
s.cnrMapMtx.Unlock()
|
|
||||||
|
|
||||||
for _, cnr := range removed {
|
|
||||||
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
|
|
||||||
|
|
||||||
err = s.DropTree(ctx, cnr, "")
|
|
||||||
if err != nil {
|
|
||||||
s.log.Error("could not remove redundant tree",
|
|
||||||
zap.Stringer("cid", cnr),
|
|
||||||
zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.log.Debug("trees have been synchronized")
|
s.log.Debug("trees have been synchronized")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
|
||||||
|
// sync new containers
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for _, cnr := range cnrs {
|
||||||
|
wg.Add(1)
|
||||||
|
cnr := cnr
|
||||||
|
err := s.syncPool.Submit(func() {
|
||||||
|
defer wg.Done()
|
||||||
|
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
|
||||||
|
|
||||||
|
err := s.synchronizeAllTrees(ctx, cnr)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
wg.Done()
|
||||||
|
s.log.Error("could not query trees for synchronization",
|
||||||
|
zap.Stringer("cid", cnr),
|
||||||
|
zap.Error(err))
|
||||||
|
if errors.Is(err, ants.ErrPoolClosed) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
||||||
|
s.cnrMapMtx.Lock()
|
||||||
|
defer s.cnrMapMtx.Unlock()
|
||||||
|
|
||||||
|
var removed []cid.ID
|
||||||
|
for cnr := range s.cnrMap {
|
||||||
|
if _, ok := newContainers[cnr]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
removed = append(removed, cnr)
|
||||||
|
}
|
||||||
|
for i := range removed {
|
||||||
|
delete(s.cnrMap, removed[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, cnr := range removed {
|
||||||
|
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
|
||||||
|
|
||||||
|
err := s.DropTree(ctx, cnr, "")
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error("could not remove redundant tree",
|
||||||
|
zap.Stringer("cid", cnr),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
|
||||||
|
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
|
||||||
|
cnrsToSync := make([]cid.ID, 0, len(cnrs))
|
||||||
|
|
||||||
|
for _, cnr := range cnrs {
|
||||||
|
_, pos, err := s.getContainerNodes(cnr)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error("could not calculate container nodes",
|
||||||
|
zap.Stringer("cid", cnr),
|
||||||
|
zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if pos < 0 {
|
||||||
|
// node is not included in the container.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
newMap[cnr] = struct{}{}
|
||||||
|
cnrsToSync = append(cnrsToSync, cnr)
|
||||||
|
}
|
||||||
|
return newMap, cnrsToSync
|
||||||
|
}
|
||||||
|
|
||||||
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
|
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
|
||||||
// It is assumed that 0 <= pos < len(nodes).
|
// It is assumed that 0 <= pos < len(nodes).
|
||||||
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
|
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
|
||||||
|
|
Loading…
Add table
Reference in a new issue