Compare commits

...

2 commits

Author SHA1 Message Date
03a9a1a516 [#1233] cli: Drop debugee from object nodes
All checks were successful
DCO action / DCO (pull_request) Successful in 2m45s
Build / Build Components (1.21) (pull_request) Successful in 3m17s
Vulncheck / Vulncheck (pull_request) Successful in 3m37s
Build / Build Components (1.22) (pull_request) Successful in 5m55s
Pre-commit hooks / Pre-commit (pull_request) Successful in 6m29s
Tests and linters / gopls check (pull_request) Successful in 7m24s
Tests and linters / Staticcheck (pull_request) Successful in 8m8s
Tests and linters / Lint (pull_request) Successful in 8m52s
Tests and linters / Tests (1.21) (pull_request) Successful in 12m16s
Tests and linters / Tests (1.22) (pull_request) Successful in 12m20s
Tests and linters / Tests with -race (pull_request) Successful in 12m24s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-08 17:55:23 +03:00
870c6254b8 [#1233] putSvc: Try to put EC chunk to any node
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-07-08 17:55:23 +03:00
2 changed files with 80 additions and 12 deletions

View file

@ -497,7 +497,6 @@ func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID,
if errors.As(err, &notFound) || errors.As(err, &removed) { if errors.As(err, &notFound) || errors.As(err, &removed) {
return false, nil return false, nil
} }
cmd.Printf("failed to get object %s from client\n", objID.EncodeToString())
return false, err return false, err
} }

View file

@ -3,8 +3,10 @@ package putsvc
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
@ -160,7 +162,7 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
} }
eg.Go(func() error { eg.Go(func() error {
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes) return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes, make([]atomic.Bool, len(nodes)))
}) })
t.SubmitSuccess() t.SubmitSuccess()
} }
@ -195,10 +197,15 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
break break
} }
visited := make([]atomic.Bool, len(nodes))
for idx := range parts {
visited[idx%len(nodes)].Store(true)
}
for idx := range parts { for idx := range parts {
idx := idx idx := idx
eg.Go(func() error { eg.Go(func() error {
return e.writePart(egCtx, parts[idx], idx, nodes) return e.writePart(egCtx, parts[idx], idx, nodes, visited)
}) })
t.SubmitSuccess() t.SubmitSuccess()
} }
@ -211,19 +218,81 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
return nil return nil
} }
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error { func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
var err error select {
node := nodes[partIdx%len(nodes)] case <-ctx.Done():
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { return ctx.Err()
err = e.writePartLocal(ctx, obj) default:
} else {
err = e.writePartRemote(ctx, obj, node)
} }
// try to save to node for current part index
node := nodes[partIdx%len(nodes)]
err := e.putECPartToNode(ctx, obj, node)
if err == nil { if err == nil {
return nil return nil
} }
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err)) e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
return err zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
partVisited := make([]bool, len(nodes))
partVisited[partIdx%len(nodes)] = true
// try to save to any node not visited by any of other parts
for i := 1; i < len(nodes); i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
idx := (partIdx + i) % len(nodes)
if !visited[idx].CompareAndSwap(false, true) {
continue
}
node = nodes[idx]
err := e.putECPartToNode(ctx, obj, node)
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
partVisited[idx] = true
}
// try to save to any node not visited by current part
for i := 0; i < len(nodes); i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if partVisited[i] {
continue
}
node = nodes[i]
err := e.putECPartToNode(ctx, obj, node)
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
}
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
}
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
return e.writePartLocal(ctx, obj)
}
return e.writePartRemote(ctx, obj, node)
} }
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {