Fix EC put when some node is off #1233
|
@ -497,7 +497,6 @@ func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID,
|
||||||
if errors.As(err, ¬Found) || errors.As(err, &removed) {
|
if errors.As(err, ¬Found) || errors.As(err, &removed) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
cmd.Printf("failed to get object %s from client\n", objID.EncodeToString())
|
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,10 @@ package putsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
@ -160,7 +162,7 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes)
|
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes, make([]atomic.Bool, len(nodes)))
|
||||||
})
|
})
|
||||||
t.SubmitSuccess()
|
t.SubmitSuccess()
|
||||||
}
|
}
|
||||||
|
@ -195,10 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
visited := make([]atomic.Bool, len(nodes))
|
||||||
|
for idx := range parts {
|
||||||
fyrchik
commented
What does this line achieve? If we have 2 loops in What does this line achieve? If we have 2 loops in `writeECPart`, we will still iterate over all nodes.
dstepanov-yadro
commented
If there are 3 parts and 4 nodes, and part 3 fails, so part 3 should try to save to node 4, not node 1 or node 2 (if node 1 or node 2 goroutines are not started yet). If there are 3 parts and 4 nodes, and part 3 fails, so part 3 should try to save to node 4, not node 1 or node 2 (if node 1 or node 2 goroutines are not started yet).
|
|||||||
|
visited[idx%len(nodes)].Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
for idx := range parts {
|
for idx := range parts {
|
||||||
idx := idx
|
idx := idx
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return e.writePart(egCtx, parts[idx], idx, nodes)
|
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||||
})
|
})
|
||||||
t.SubmitSuccess()
|
t.SubmitSuccess()
|
||||||
}
|
}
|
||||||
|
@ -211,19 +218,81 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error {
|
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
|
||||||
var err error
|
select {
|
||||||
node := nodes[partIdx%len(nodes)]
|
case <-ctx.Done():
|
||||||
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
return ctx.Err()
|
||||||
err = e.writePartLocal(ctx, obj)
|
default:
|
||||||
} else {
|
|
||||||
err = e.writePartRemote(ctx, obj, node)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// try to save to node for current part index
|
||||||
|
node := nodes[partIdx%len(nodes)]
|
||||||
|
err := e.putECPartToNode(ctx, obj, node)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err))
|
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||||
return err
|
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||||
|
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
|
||||||
|
|
||||||
|
partVisited := make([]bool, len(nodes))
|
||||||
|
partVisited[partIdx%len(nodes)] = true
|
||||||
|
|
||||||
|
// try to save to any node not visited by any of other parts
|
||||||
|
for i := 1; i < len(nodes); i++ {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
idx := (partIdx + i) % len(nodes)
|
||||||
|
if !visited[idx].CompareAndSwap(false, true) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
node = nodes[idx]
|
||||||
|
err := e.putECPartToNode(ctx, obj, node)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||||
|
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||||
|
zap.String("node", hex.EncodeToString(node.PublicKey())),
|
||||||
|
zap.Error(err))
|
||||||
|
|
||||||
|
partVisited[idx] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to save to any node not visited by current part
|
||||||
fyrchik
commented
If we are here, it means that either The question is: why do we need 2 loops? If we are here, it means that either `partVisited[idx] == true` or we skipped some iteration from the previous loop in `if !visited[idx].CompareAndSwap(false, true)`. If `partVisited[idx] == true`, we skip the node, so this loop iterates over those `visited[idx]` which it skipped in the previous loop.
The question is: why do we need 2 loops?
dstepanov-yadro
commented
It there are 3 parts and 4 nodes and part with index 2 fails on nodes with index 2 and 3, then state after first iteration will be:
So after first iteration part 3 will try to save to node 0 and 1. It there are 3 parts and 4 nodes and part with index 2 fails on nodes with index 2 and 3, then state after first iteration will be:
```
visited[0] = true // visited by part 0
visited[1] = true // visited by part 1
visited[2] = true // visited by part 2 and failed
visited[3] = true // visited by part 2 and failed
```
So after first iteration part 3 will try to save to node 0 and 1.
|
|||||||
|
for i := 0; i < len(nodes); i++ {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if partVisited[i] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
node = nodes[i]
|
||||||
|
err := e.putECPartToNode(ctx, obj, node)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||||
|
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||||
|
zap.String("node", hex.EncodeToString(node.PublicKey())),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||||
|
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||||
|
return e.writePartLocal(ctx, obj)
|
||||||
|
}
|
||||||
|
return e.writePartRemote(ctx, obj, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||||
|
|
Irrelevant to the commit.
fixed