diff --git a/pkg/services/object/put/ec.go b/pkg/services/object/put/ec.go index 6237872f..6da50195 100644 --- a/pkg/services/object/put/ec.go +++ b/pkg/services/object/put/ec.go @@ -3,8 +3,10 @@ package putsvc import ( "context" "crypto/ecdsa" + "encoding/hex" "errors" "fmt" + "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" @@ -160,7 +162,7 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error } eg.Go(func() error { - return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes) + return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes, make([]atomic.Bool, len(nodes))) }) t.SubmitSuccess() } @@ -195,10 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er break } + visited := make([]atomic.Bool, len(nodes)) + for idx := range parts { + visited[idx%len(nodes)].Store(true) + } + for idx := range parts { idx := idx eg.Go(func() error { - return e.writePart(egCtx, parts[idx], idx, nodes) + return e.writePart(egCtx, parts[idx], idx, nodes, visited) }) t.SubmitSuccess() } @@ -211,19 +218,81 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er return nil } -func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error { - var err error - node := nodes[partIdx%len(nodes)] - if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { - err = e.writePartLocal(ctx, obj) - } else { - err = e.writePartRemote(ctx, obj, node) +func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: } + + // try to save to node for current part index + node := nodes[partIdx%len(nodes)] + err := e.putECPartToNode(ctx, obj, node) if err == nil { return nil } - e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err)) - return err + e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), + zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), + zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) + + partVisited := make([]bool, len(nodes)) + partVisited[partIdx%len(nodes)] = true + + // try to save to any node not visited by any of other parts + for i := 1; i < len(nodes); i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + idx := (partIdx + i) % len(nodes) + if !visited[idx].CompareAndSwap(false, true) { + continue + } + node = nodes[idx] + err := e.putECPartToNode(ctx, obj, node) + if err == nil { + return nil + } + e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), + zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), + zap.String("node", hex.EncodeToString(node.PublicKey())), + zap.Error(err)) + + partVisited[idx] = true + } + + // try to save to any node not visited by current part + for i := 0; i < len(nodes); i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if partVisited[i] { + continue + } + node = nodes[i] + err := e.putECPartToNode(ctx, obj, node) + if err == nil { + return nil + } + e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), + zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), + zap.String("node", hex.EncodeToString(node.PublicKey())), + zap.Error(err)) + } + + return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj)) +} + +func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { + if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { + return e.writePartLocal(ctx, obj) + } + return e.writePartRemote(ctx, obj, node) } func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {