Fix some of the linter exceptions #156

Merged
fyrchik merged 7 commits from fyrchik/frostfs-node:linter-fixes into master 2023-03-22 07:14:19 +00:00
13 changed files with 460 additions and 444 deletions

View file

@ -44,7 +44,6 @@ const (
notaryEnabled = true
)
// nolint: funlen, gocognit
func dumpBalances(cmd *cobra.Command, _ []string) error {
var (
dumpStorage, _ = cmd.Flags().GetBool(dumpBalancesStorageFlag)
@ -84,86 +83,110 @@ func dumpBalances(cmd *cobra.Command, _ []string) error {
printBalances(cmd, "Inner ring nodes balances:", irList)
if dumpStorage {
arr, err := unwrap.Array(inv.Call(nmHash, "netmap"))
if err != nil {
return errors.New("can't fetch the list of storage nodes")
}
snList := make([]accBalancePair, len(arr))
for i := range arr {
node, ok := arr[i].Value().([]stackitem.Item)
if !ok || len(node) == 0 {
return errors.New("can't parse the list of storage nodes")
}
bs, err := node[0].TryBytes()
if err != nil {
return errors.New("can't parse the list of storage nodes")
}
var ni netmap.NodeInfo
if err := ni.Unmarshal(bs); err != nil {
return fmt.Errorf("can't parse the list of storage nodes: %w", err)
}
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
if err != nil {
return fmt.Errorf("can't parse storage node public key: %w", err)
}
snList[i].scriptHash = pub.GetScriptHash()
}
if err := fetchBalances(inv, gas.Hash, snList); err != nil {
if err := printStorageNodeBalances(cmd, inv, nmHash); err != nil {
return err
}
printBalances(cmd, "\nStorage node balances:", snList)
}
if dumpProxy {
h, err := nnsResolveHash(inv, nnsCs.Hash, proxyContract+".frostfs")
if err != nil {
return fmt.Errorf("can't get hash of the proxy contract: %w", err)
}
proxyList := []accBalancePair{{scriptHash: h}}
if err := fetchBalances(inv, gas.Hash, proxyList); err != nil {
if err := printProxyContractBalance(cmd, inv, nnsCs.Hash); err != nil {
return err
}
printBalances(cmd, "\nProxy contract balance:", proxyList)
}
if dumpAlphabet {
alphaList := make([]accBalancePair, len(irList))
w := io.NewBufBinWriter()
for i := range alphaList {
emit.AppCall(w.BinWriter, nnsCs.Hash, "resolve", callflag.ReadOnly,
getAlphabetNNSDomain(i),
int64(nns.TXT))
}
if w.Err != nil {
panic(w.Err)
}
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
if err != nil {
return fmt.Errorf("can't fetch info from NNS: %w", err)
}
for i := range alphaList {
h, err := parseNNSResolveResult(alphaRes.Stack[i])
if err != nil {
return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err)
}
alphaList[i].scriptHash = h
}
if err := fetchBalances(inv, gas.Hash, alphaList); err != nil {
if err := printAlphabetContractBalances(cmd, c, inv, len(irList), nnsCs.Hash); err != nil {
return err
}
printBalances(cmd, "\nAlphabet contracts balances:", alphaList)
}
return nil
}
func printStorageNodeBalances(cmd *cobra.Command, inv *invoker.Invoker, nmHash util.Uint160) error {
arr, err := unwrap.Array(inv.Call(nmHash, "netmap"))
if err != nil {
return errors.New("can't fetch the list of storage nodes")
}
snList := make([]accBalancePair, len(arr))
for i := range arr {
node, ok := arr[i].Value().([]stackitem.Item)
if !ok || len(node) == 0 {
return errors.New("can't parse the list of storage nodes")
}
bs, err := node[0].TryBytes()
if err != nil {
return errors.New("can't parse the list of storage nodes")
}
var ni netmap.NodeInfo
if err := ni.Unmarshal(bs); err != nil {
return fmt.Errorf("can't parse the list of storage nodes: %w", err)
}
pub, err := keys.NewPublicKeyFromBytes(ni.PublicKey(), elliptic.P256())
if err != nil {
return fmt.Errorf("can't parse storage node public key: %w", err)
}
snList[i].scriptHash = pub.GetScriptHash()
}
if err := fetchBalances(inv, gas.Hash, snList); err != nil {
return err
}
printBalances(cmd, "\nStorage node balances:", snList)
return nil
}
func printProxyContractBalance(cmd *cobra.Command, inv *invoker.Invoker, nnsHash util.Uint160) error {
h, err := nnsResolveHash(inv, nnsHash, proxyContract+".frostfs")
if err != nil {
return fmt.Errorf("can't get hash of the proxy contract: %w", err)
}
proxyList := []accBalancePair{{scriptHash: h}}
if err := fetchBalances(inv, gas.Hash, proxyList); err != nil {
return err
}
printBalances(cmd, "\nProxy contract balance:", proxyList)
return nil
}
func printAlphabetContractBalances(cmd *cobra.Command, c Client, inv *invoker.Invoker, count int, nnsHash util.Uint160) error {
alphaList := make([]accBalancePair, count)
w := io.NewBufBinWriter()
for i := range alphaList {
emit.AppCall(w.BinWriter, nnsHash, "resolve", callflag.ReadOnly,
getAlphabetNNSDomain(i),
int64(nns.TXT))
}
if w.Err != nil {
panic(w.Err)
}
alphaRes, err := c.InvokeScript(w.Bytes(), nil)
if err != nil {
return fmt.Errorf("can't fetch info from NNS: %w", err)
}
for i := range alphaList {
h, err := parseNNSResolveResult(alphaRes.Stack[i])
if err != nil {
return fmt.Errorf("can't fetch the alphabet contract #%d hash: %w", i, err)
}
alphaList[i].scriptHash = h
}
if err := fetchBalances(inv, gas.Hash, alphaList); err != nil {
return err
}
printBalances(cmd, "\nAlphabet contracts balances:", alphaList)
return nil
}
func fetchIRNodes(c Client, nmHash, desigHash util.Uint160) ([]accBalancePair, error) {
var irList []accBalancePair

View file

@ -51,17 +51,17 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a
}
// TreeApply implements the pilorama.Forest interface.
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
index, lst, err := e.getTreeShard(d.CID, treeID)
func (e *StorageEngine) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
index, lst, err := e.getTreeShard(cnr, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err
}
err = lst[index].TreeApply(d, treeID, m, backgroundSync)
err = lst[index].TreeApply(cnr, treeID, m, backgroundSync)
if err != nil {
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeApply`", err,
zap.Stringer("cid", d.CID),
zap.Stringer("cid", cnr),
zap.String("tree", treeID))
}
return err

View file

@ -138,57 +138,57 @@ func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
// Operation is very resource-intensive, which is caused by the admissibility
// of multiple locks. Also, if we knew what objects are locked, it would be
// possible to speed up the execution.
//
// nolint: gocognit
func freePotentialLocks(tx *bbolt.Tx, idCnr cid.ID, locker oid.ID) error {
bucketLocked := tx.Bucket(bucketNameLocked)
if bucketLocked != nil {
key := make([]byte, cidSize)
idCnr.Encode(key)
if bucketLocked == nil {
return nil
}
bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer != nil {
keyLocker := objectKey(locker, key)
return bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
}
key := make([]byte, cidSize)
idCnr.Encode(key)
for i := range keyLockers {
if bytes.Equal(keyLockers[i], keyLocker) {
if len(keyLockers) == 1 {
// locker was all alone
err = bucketLockedContainer.Delete(k)
if err != nil {
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
bucketLockedContainer := bucketLocked.Bucket(key)
if bucketLockedContainer == nil {
return nil
}
v, err = encodeList(keyLockers)
if err != nil {
return fmt.Errorf("encode updated list of lockers: %w", err)
}
keyLocker := objectKey(locker, key)
return bucketLockedContainer.ForEach(func(k, v []byte) error {
keyLockers, err := decodeList(v)
if err != nil {
return fmt.Errorf("decode list of lockers in locked bucket: %w", err)
}
// update the record
err = bucketLockedContainer.Put(k, v)
if err != nil {
return fmt.Errorf("update list of lockers: %w", err)
}
}
for i := range keyLockers {
if bytes.Equal(keyLockers[i], keyLocker) {
if len(keyLockers) == 1 {
// locker was all alone
err = bucketLockedContainer.Delete(k)
if err != nil {
return fmt.Errorf("delete locked object record from locked bucket: %w", err)
}
} else {
// exclude locker
keyLockers = append(keyLockers[:i], keyLockers[i+1:]...)
return nil
v, err = encodeList(keyLockers)
if err != nil {
return fmt.Errorf("encode updated list of lockers: %w", err)
}
// update the record
err = bucketLockedContainer.Put(k, v)
if err != nil {
return fmt.Errorf("update list of lockers: %w", err)
}
}
return nil
})
}
}
}
return nil
return nil
})
}
// IsLockedPrm groups the parameters of IsLocked operation.

View file

@ -327,11 +327,7 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
}
// TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error {
if !d.checkValid() {
return ErrInvalidCIDDescriptor
}
func (t *boltForest) TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
@ -344,7 +340,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
if backgroundSync {
var seen bool
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(d.CID, treeID))
treeRoot := tx.Bucket(bucketName(cnr, treeID))
if treeRoot == nil {
return nil
}
@ -362,7 +358,7 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
}
if t.db.MaxBatchSize == 1 {
fullID := bucketName(d.CID, treeID)
fullID := bucketName(cnr, treeID)
return t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
@ -375,11 +371,11 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
}
ch := make(chan error, 1)
t.addBatch(d, treeID, m, ch)
t.addBatch(cnr, treeID, m, ch)
return <-ch
}
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
t.mtx.Lock()
for i := 0; i < len(t.batches); i++ {
t.batches[i].mtx.Lock()
@ -391,7 +387,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
continue
}
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
found := t.batches[i].cid.Equals(cnr) && t.batches[i].treeID == treeID
if found {
t.batches[i].results = append(t.batches[i].results, ch)
t.batches[i].operations = append(t.batches[i].operations, m)
@ -412,7 +408,7 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
}
b := &batch{
forest: t,
cid: d.CID,
cid: cnr,
treeID: treeID,
results: []chan<- error{ch},
operations: []*Move{m},

View file

@ -5,6 +5,7 @@ import (
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
@ -93,12 +94,8 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
}
// TreeApply implements the Forest interface.
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move, _ bool) error {
if !d.checkValid() {
return ErrInvalidCIDDescriptor
}
fullID := d.CID.String() + "/" + treeID
func (f *memoryForest) TreeApply(cnr cid.ID, treeID string, op *Move, _ bool) error {
fullID := cnr.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
s = newState()

View file

@ -411,21 +411,10 @@ func TestForest_Apply(t *testing.T) {
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
t.Run("invalid descriptor", func(t *testing.T) {
s := constructor(t)
err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{
Child: 10,
Parent: 0,
Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}},
}, false)
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
})
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
require.NoError(t, s.TreeApply(d, treeID, &Move{
require.NoError(t, s.TreeApply(cid, treeID, &Move{
Child: child,
Parent: parent,
Meta: meta,
@ -465,7 +454,6 @@ func TestForest_GetOpLog(t *testing.T) {
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
logs := []Move{
{
@ -491,7 +479,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Op
})
for i := range logs {
require.NoError(t, s.TreeApply(d, treeID, &logs[i], false))
require.NoError(t, s.TreeApply(cid, treeID, &logs[i], false))
}
testGetOpLog := func(t *testing.T, height uint64, m Move) {
@ -533,13 +521,12 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
cid := cidtest.ID()
treeID := "version"
d := CIDDescriptor{cid, 0, 1}
t.Run("empty state, no panic", func(t *testing.T) {
checkExists(t, false, cid, treeID)
})
require.NoError(t, s.TreeApply(d, treeID, &Move{Parent: 0, Child: 1}, false))
require.NoError(t, s.TreeApply(cid, treeID, &Move{Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
checkExists(t, false, cid, "another tree") // same CID, different tree
@ -570,16 +557,16 @@ func TestApplyTricky1(t *testing.T) {
}
treeID := "version"
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1}
cid := cidtest.ID()
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
s := providers[i].construct(t)
for i := range ops {
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
}
for i := range expected {
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child)
_, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
require.NoError(t, err)
require.Equal(t, expected[i].parent, parent)
}
@ -631,16 +618,16 @@ func TestApplyTricky2(t *testing.T) {
}
treeID := "version"
d := CIDDescriptor{CID: cidtest.ID(), Position: 0, Size: 1}
cid := cidtest.ID()
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
s := providers[i].construct(t)
for i := range ops {
require.NoError(t, s.TreeApply(d, treeID, &ops[i], false))
require.NoError(t, s.TreeApply(cid, treeID, &ops[i], false))
}
for i := range expected {
_, parent, err := s.TreeGetMeta(d.CID, treeID, expected[i].child)
_, parent, err := s.TreeGetMeta(cid, treeID, expected[i].child)
require.NoError(t, err)
require.Equal(t, expected[i].parent, parent)
}
@ -746,12 +733,11 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
}
for i := 0; i < iterCount; i++ {
@ -766,7 +752,7 @@ func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _
go func() {
defer wg.Done()
for op := range ch {
require.NoError(t, actual.TreeApply(d, treeID, op, false))
require.NoError(t, actual.TreeApply(cid, treeID, op, false))
}
}()
}
@ -792,12 +778,11 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
require.NoError(t, expected.TreeApply(cid, treeID, &ops[i], false))
}
const iterCount = 200
@ -807,7 +792,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ..
actual := constructor(t)
for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
require.NoError(t, actual.TreeApply(cid, treeID, &ops[i], false))
}
compareForests(t, expected, actual, cid, treeID, nodeCount)
}
@ -889,7 +874,6 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
ops := genFunc(b.N)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
ch := make(chan int, b.N)
for i := 0; i < b.N; i++ {
@ -901,7 +885,7 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
b.SetParallelism(10)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
if err := s.TreeApply(cid, treeID, &ops[<-ch], false); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
}
@ -918,7 +902,6 @@ func TestTreeGetByPath(t *testing.T) {
func testTreeGetByPath(t *testing.T, s Forest) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
// /
@ -928,12 +911,12 @@ func testTreeGetByPath(t *testing.T, s Forest) {
// |- cat1.jpg, Version=XXX (4)
// |- cat1.jpg, Version=YYY (5)
// |- cat2.jpg, Version=ZZZ (6)
testMove(t, s, 0, 1, 0, d, treeID, "a", "")
testMove(t, s, 1, 2, 0, d, treeID, "b", "")
testMove(t, s, 2, 3, 1, d, treeID, "cat1.jpg", "TTT")
testMove(t, s, 3, 4, 2, d, treeID, "cat1.jpg", "XXX")
testMove(t, s, 4, 5, 2, d, treeID, "cat1.jpg", "YYY")
testMove(t, s, 5, 6, 2, d, treeID, "cat2.jpg", "ZZZ")
testMove(t, s, 0, 1, 0, cid, treeID, "a", "")
testMove(t, s, 1, 2, 0, cid, treeID, "b", "")
testMove(t, s, 2, 3, 1, cid, treeID, "cat1.jpg", "TTT")
testMove(t, s, 3, 4, 2, cid, treeID, "cat1.jpg", "XXX")
testMove(t, s, 4, 5, 2, cid, treeID, "cat1.jpg", "YYY")
testMove(t, s, 5, 6, 2, cid, treeID, "cat2.jpg", "ZZZ")
if mf, ok := s.(*memoryForest); ok {
single := mf.treeMap[cid.String()+"/"+treeID]
@ -970,14 +953,14 @@ func testTreeGetByPath(t *testing.T, s Forest) {
})
}
func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor, treeID, filename, version string) {
func testMove(t *testing.T, s Forest, ts int, node, parent Node, cid cidSDK.ID, treeID, filename, version string) {
items := make([]KeyValue, 1, 2)
items[0] = KeyValue{AttributeFilename, []byte(filename)}
if version != "" {
items = append(items, KeyValue{AttributeVersion, []byte(version)})
}
require.NoError(t, s.TreeApply(d, treeID, &Move{
require.NoError(t, s.TreeApply(cid, treeID, &Move{
Parent: parent,
Child: node,
Meta: Meta{

View file

@ -18,7 +18,7 @@ type Forest interface {
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
// TreeApply applies replicated operation from another node.
// If background is true, TreeApply will first check whether an operation exists.
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error
TreeApply(cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
// TreeGetByPath returns all nodes corresponding to the path.
// The path is constructed by descending from the root using the values of the
// AttributeFilename in meta.

View file

@ -155,7 +155,6 @@ func (s *Shard) Init() error {
return nil
}
// nolint: funlen
func (s *Shard) refillMetabase() error {
err := s.metaBase.Reset()
if err != nil {
@ -172,57 +171,23 @@ func (s *Shard) refillMetabase() error {
return nil
}
// nolint: exhaustive
var err error
switch obj.Type() {
case objectSDK.TypeTombstone:
tombstone := objectSDK.NewTombstone()
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
}
tombAddr := object.AddressOf(obj)
memberIDs := tombstone.Members()
tombMembers := make([]oid.Address, 0, len(memberIDs))
for i := range memberIDs {
a := tombAddr
a.SetObject(memberIDs[i])
tombMembers = append(tombMembers, a)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetAddresses(tombMembers...)
_, err = s.metaBase.Inhume(inhumePrm)
if err != nil {
return fmt.Errorf("could not inhume objects: %w", err)
}
err = s.refillTombstoneObject(obj)
case objectSDK.TypeLock:
var lock objectSDK.Lock
if err := lock.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal lock content: %w", err)
}
locked := make([]oid.ID, lock.NumberOfMembers())
lock.ReadMembers(locked)
cnr, _ := obj.ContainerID()
id, _ := obj.ID()
err = s.metaBase.Lock(cnr, id, locked)
if err != nil {
return fmt.Errorf("could not lock objects: %w", err)
}
err = s.refillLockObject(obj)
default:
}
if err != nil {
return err
}
var mPrm meta.PutPrm
mPrm.SetObject(obj)
mPrm.SetStorageID(descriptor)
_, err := s.metaBase.Put(mPrm)
_, err = s.metaBase.Put(mPrm)
if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
return err
}
@ -241,6 +206,54 @@ func (s *Shard) refillMetabase() error {
return nil
}
func (s *Shard) refillLockObject(obj *objectSDK.Object) error {
var lock objectSDK.Lock
if err := lock.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal lock content: %w", err)
}
locked := make([]oid.ID, lock.NumberOfMembers())
lock.ReadMembers(locked)
cnr, _ := obj.ContainerID()
id, _ := obj.ID()
err := s.metaBase.Lock(cnr, id, locked)
if err != nil {
return fmt.Errorf("could not lock objects: %w", err)
}
return nil
}
func (s *Shard) refillTombstoneObject(obj *objectSDK.Object) error {
tombstone := objectSDK.NewTombstone()
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
}
tombAddr := object.AddressOf(obj)
memberIDs := tombstone.Members()
tombMembers := make([]oid.Address, 0, len(memberIDs))
for i := range memberIDs {
a := tombAddr
a.SetObject(memberIDs[i])
tombMembers = append(tombMembers, a)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetAddresses(tombMembers...)
_, err := s.metaBase.Inhume(inhumePrm)
if err != nil {
return fmt.Errorf("could not inhume objects: %w", err)
}
return nil
}
// Close releases all Shard's components.
func (s *Shard) Close() error {
components := []interface{ Close() error }{}

View file

@ -42,7 +42,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
}
// TreeApply implements the pilorama.Forest interface.
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move, backgroundSync bool) error {
func (s *Shard) TreeApply(cnr cidSDK.ID, treeID string, m *pilorama.Move, backgroundSync bool) error {
if s.pilorama == nil {
return ErrPiloramaDisabled
}
@ -53,7 +53,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
return s.pilorama.TreeApply(d, treeID, m, backgroundSync)
return s.pilorama.TreeApply(cnr, treeID, m, backgroundSync)
}
// TreeGetByPath implements the pilorama.Forest interface.

View file

@ -426,7 +426,7 @@ func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object)
return nil
}
// nolint: funlen, gocognit
// nolint: funlen
func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp *objectV2.HeadResponse) (*getsvc.HeadPrm, error) {
body := req.GetBody()
@ -459,138 +459,140 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
body: resp.GetBody(),
})
if !commonPrm.LocalOnly() {
var onceResign sync.Once
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.GetObjectHeader implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// send Head request
var headResp *objectV2.HeadResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if !body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
h := v
hdr = new(objectV2.Header)
hdr.SetPayloadLength(h.GetPayloadLength())
hdr.SetVersion(h.GetVersion())
hdr.SetOwnerID(h.GetOwnerID())
hdr.SetObjectType(h.GetObjectType())
hdr.SetCreationEpoch(h.GetCreationEpoch())
hdr.SetPayloadHash(h.GetPayloadHash())
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
case *objectV2.HeaderWithSignature:
if body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
hdrWithSig := v
if hdrWithSig == nil {
return nil, errors.New("nil object part")
}
hdr = hdrWithSig.GetHeader()
idSig = hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, errors.New("missing signature")
}
binID, err := objAddr.Object().Marshal()
if err != nil {
return nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, errors.New("invalid object ID signature")
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(objAddr.Object())
// convert the object
return obj, nil
}))
if commonPrm.LocalOnly() {
return p, nil
}
var onceResign sync.Once
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
// once compose and resign forwarding request
onceResign.Do(func() {
// compose meta header of the local server
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(meta.GetTTL() - 1)
// TODO: #1165 think how to set the other fields
metaHdr.SetOrigin(meta)
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
err = signature.SignServiceMessage(key, req)
})
if err != nil {
return nil, err
}
// code below is copy-pasted from c.GetObjectHeader implementation,
// perhaps it is worth highlighting the utility function in frostfs-api-go
// send Head request
var headResp *objectV2.HeadResponse
err = c.RawForAddress(addr, func(cli *rpcclient.Client) error {
headResp, err = rpc.HeadObject(cli, req, rpcclient.WithContext(ctx))
return err
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
}
// verify response key
if err = internal.VerifyResponseKeyV2(pubkey, headResp); err != nil {
return nil, err
}
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return nil, fmt.Errorf("response verification failed: %w", err)
}
if err = checkStatus(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, err
}
var (
hdr *objectV2.Header
idSig *refs.Signature
)
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
case *objectV2.ShortHeader:
if !body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
}
h := v
hdr = new(objectV2.Header)
hdr.SetPayloadLength(h.GetPayloadLength())
hdr.SetVersion(h.GetVersion())
hdr.SetOwnerID(h.GetOwnerID())
hdr.SetObjectType(h.GetObjectType())
hdr.SetCreationEpoch(h.GetCreationEpoch())
hdr.SetPayloadHash(h.GetPayloadHash())
hdr.SetHomomorphicHash(h.GetHomomorphicHash())
case *objectV2.HeaderWithSignature:
if body.GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
}
hdrWithSig := v
if hdrWithSig == nil {
return nil, errors.New("nil object part")
}
hdr = hdrWithSig.GetHeader()
idSig = hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, errors.New("missing signature")
}
binID, err := objAddr.Object().Marshal()
if err != nil {
return nil, fmt.Errorf("marshal ID: %w", err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binID) {
return nil, errors.New("invalid object ID signature")
}
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
}
objv2 := new(objectV2.Object)
objv2.SetHeader(hdr)
objv2.SetSignature(idSig)
obj := object.NewFromV2(objv2)
obj.SetID(objAddr.Object())
// convert the object
return obj, nil
}))
return p, nil
}

View file

@ -27,7 +27,7 @@ type replicationTask struct {
type applyOp struct {
treeID string
pilorama.CIDDescriptor
cid cidSDK.ID
pilorama.Move
}
@ -43,7 +43,7 @@ func (s *Service) localReplicationWorker() {
case <-s.closeCh:
return
case op := <-s.replicateLocalCh:
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
err := s.forest.TreeApply(op.cid, op.treeID, &op.Move, false)
if err != nil {
s.log.Error("failed to apply replicated operation",
zap.String("err", err.Error()))
@ -52,7 +52,7 @@ func (s *Service) localReplicationWorker() {
}
}
func (s *Service) replicationWorker() {
func (s *Service) replicationWorker(ctx context.Context) {
for {
select {
case <-s.closeCh:
@ -64,13 +64,13 @@ func (s *Service) replicationWorker() {
task.n.IterateNetworkEndpoints(func(addr string) bool {
lastAddr = addr
c, err := s.cache.get(context.Background(), addr)
c, err := s.cache.get(ctx, addr)
if err != nil {
lastErr = fmt.Errorf("can't create client: %w", err)
return false
}
ctx, cancel := context.WithTimeout(context.Background(), s.replicatorTimeout)
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
_, lastErr = c.Apply(ctx, task.req)
cancel()
@ -94,8 +94,7 @@ func (s *Service) replicationWorker() {
func (s *Service) replicateLoop(ctx context.Context) {
for i := 0; i < s.replicatorWorkerCount; i++ {
//nolint: contextcheck
go s.replicationWorker()
go s.replicationWorker(ctx)
go s.localReplicationWorker()
}
defer func() {

View file

@ -468,7 +468,7 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
key := req.GetSignature().GetKey()
_, pos, size, err := s.getContainerInfo(cid, key)
_, pos, _, err := s.getContainerInfo(cid, key)
if err != nil {
return nil, err
}
@ -485,8 +485,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
select {
case s.replicateLocalCh <- applyOp{
treeID: req.GetBody().GetTreeId(),
CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
treeID: req.GetBody().GetTreeId(),
cid: cid,
Move: pilorama.Move{
Parent: op.GetParentId(),
Child: op.GetChildId(),

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
@ -40,11 +41,6 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
return ErrNotInContainer
}
var d pilorama.CIDDescriptor
d.CID = cid
d.Position = pos
d.Size = len(nodes)
nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 {
return nil
@ -87,18 +83,18 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
}
for _, tid := range treesToSync {
h, err := s.forest.TreeLastSyncHeight(d.CID, tid)
h, err := s.forest.TreeLastSyncHeight(cid, tid)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
s.log.Warn("could not get last synchronized height for a tree",
zap.Stringer("cid", d.CID),
zap.Stringer("cid", cid),
zap.String("tree", tid))
continue
}
newHeight := s.synchronizeTree(ctx, d, h, tid, nodes)
newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
if h < newHeight {
if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil {
if err := s.forest.TreeUpdateLastSyncHeight(cid, tid, newHeight); err != nil {
s.log.Warn("could not update last synchronized height for a tree",
zap.Stringer("cid", d.CID),
zap.Stringer("cid", cid),
zap.String("tree", tid))
}
}
@ -118,24 +114,19 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
return ErrNotInContainer
}
var d pilorama.CIDDescriptor
d.CID = cid
d.Position = pos
d.Size = len(nodes)
nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 {
return nil
}
s.synchronizeTree(ctx, d, 0, treeID, nodes)
s.synchronizeTree(ctx, cid, 0, treeID, nodes)
return nil
}
func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, from uint64,
func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint64,
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
s.log.Debug("synchronize tree",
zap.Stringer("cid", d.CID),
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.Uint64("from", from))
@ -157,7 +148,7 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
treeClient := NewTreeServiceClient(cc)
for {
h, err := s.synchronizeSingle(ctx, d, treeID, height, treeClient)
h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient)
if height < h {
height = h
}
@ -179,9 +170,9 @@ func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor,
return newHeight
}
func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescriptor, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
func (s *Service) synchronizeSingle(ctx context.Context, cid cidSDK.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
rawCID := make([]byte, sha256.Size)
d.CID.Encode(rawCID)
cid.Encode(rawCID)
for {
newHeight := height
@ -211,7 +202,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return newHeight, err
}
if err := s.forest.TreeApply(d, treeID, m, true); err != nil {
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
return newHeight, err
}
if m.Time > newHeight {
@ -255,7 +246,6 @@ func (s *Service) SynchronizeAll() error {
}
}
// nolint: funlen, gocognit
func (s *Service) syncLoop(ctx context.Context) {
for {
select {
@ -272,86 +262,99 @@ func (s *Service) syncLoop(ctx context.Context) {
continue
}
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
cnrsToSync := make([]cid.ID, 0, len(cnrs))
newMap, cnrsToSync := s.containersToSync(cnrs)
var removed []cid.ID
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error("could not calculate container nodes",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
s.syncContainers(ctx, cnrsToSync)
if pos < 0 {
// node is not included in the container.
continue
}
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
}
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrsToSync {
wg.Add(1)
cnr := cnr
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
return
}
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
})
if err != nil {
wg.Done()
s.log.Error("could not query trees for synchronization",
zap.Stringer("cid", cnr),
zap.Error(err))
if errors.Is(err, ants.ErrPoolClosed) {
return
}
}
}
wg.Wait()
s.cnrMapMtx.Lock()
for cnr := range s.cnrMap {
if _, ok := newMap[cnr]; ok {
continue
}
removed = append(removed, cnr)
}
for i := range removed {
delete(s.cnrMap, removed[i])
}
s.cnrMapMtx.Unlock()
for _, cnr := range removed {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err = s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
}
s.removeContainers(ctx, newMap)
s.log.Debug("trees have been synchronized")
}
}
}
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrs {
wg.Add(1)
cnr := cnr
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
return
}
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
})
if err != nil {
wg.Done()
s.log.Error("could not query trees for synchronization",
zap.Stringer("cid", cnr),
zap.Error(err))
if errors.Is(err, ants.ErrPoolClosed) {
return
}
}
}
wg.Wait()
}
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
s.cnrMapMtx.Lock()
defer s.cnrMapMtx.Unlock()
var removed []cid.ID
for cnr := range s.cnrMap {
if _, ok := newContainers[cnr]; ok {
continue
}
removed = append(removed, cnr)
}
for i := range removed {
delete(s.cnrMap, removed[i])
}
for _, cnr := range removed {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err := s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),
zap.Error(err))
}

looks redundant

looks redundant

continue or log?

`continue` or `log`?

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

Fixed.

nice thread, awesome comments

nice thread, awesome comments
}
}
func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
cnrsToSync := make([]cid.ID, 0, len(cnrs))
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error("could not calculate container nodes",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
if pos < 0 {
// node is not included in the container.
continue
}
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
}
return newMap, cnrsToSync
}
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
// It is assumed that 0 <= pos < len(nodes).
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {