forked from TrueCloudLab/frostfs-node
[#1233] putSvc: Try to put EC chunk to any node
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
6ef38c07bd
commit
84ecd61dfd
1 changed files with 80 additions and 11 deletions
|
@ -3,8 +3,10 @@ package putsvc
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
|
@ -160,7 +162,7 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
|
|||
}
|
||||
|
||||
eg.Go(func() error {
|
||||
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes)
|
||||
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes, make([]atomic.Bool, len(nodes)))
|
||||
})
|
||||
t.SubmitSuccess()
|
||||
}
|
||||
|
@ -195,10 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
break
|
||||
}
|
||||
|
||||
visited := make([]atomic.Bool, len(nodes))
|
||||
for idx := range parts {
|
||||
visited[idx%len(nodes)].Store(true)
|
||||
}
|
||||
|
||||
for idx := range parts {
|
||||
idx := idx
|
||||
eg.Go(func() error {
|
||||
return e.writePart(egCtx, parts[idx], idx, nodes)
|
||||
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
|
||||
})
|
||||
t.SubmitSuccess()
|
||||
}
|
||||
|
@ -211,19 +218,81 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error {
|
||||
var err error
|
||||
node := nodes[partIdx%len(nodes)]
|
||||
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
err = e.writePartLocal(ctx, obj)
|
||||
} else {
|
||||
err = e.writePartRemote(ctx, obj, node)
|
||||
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// try to save to node for current part index
|
||||
node := nodes[partIdx%len(nodes)]
|
||||
err := e.putECPartToNode(ctx, obj, node)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err))
|
||||
return err
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
|
||||
|
||||
partVisited := make([]bool, len(nodes))
|
||||
partVisited[partIdx%len(nodes)] = true
|
||||
|
||||
// try to save to any node not visited by any of other parts
|
||||
for i := 1; i < len(nodes); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
idx := (partIdx + i) % len(nodes)
|
||||
if !visited[idx].CompareAndSwap(false, true) {
|
||||
continue
|
||||
}
|
||||
node = nodes[idx]
|
||||
err := e.putECPartToNode(ctx, obj, node)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())),
|
||||
zap.Error(err))
|
||||
|
||||
partVisited[idx] = true
|
||||
}
|
||||
|
||||
// try to save to any node not visited by current part
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if partVisited[i] {
|
||||
continue
|
||||
}
|
||||
node = nodes[i]
|
||||
err := e.putECPartToNode(ctx, obj, node)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
|
||||
}
|
||||
|
||||
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
return e.writePartLocal(ctx, obj)
|
||||
}
|
||||
return e.writePartRemote(ctx, obj, node)
|
||||
}
|
||||
|
||||
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||
|
|
Loading…
Reference in a new issue