frostfs-node/pkg/services/object/put/ec.go
Anton Nikiforov 990e984023
Some checks failed
DCO action / DCO (pull_request) Successful in 1m45s
Vulncheck / Vulncheck (pull_request) Successful in 2m21s
Build / Build Components (pull_request) Successful in 2m45s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m47s
Tests and linters / Staticcheck (pull_request) Successful in 2m26s
Tests and linters / gopls check (pull_request) Failing after 2m31s
Tests and linters / Tests with -race (pull_request) Successful in 2m56s
Tests and linters / Lint (pull_request) Successful in 3m24s
Tests and linters / Tests (pull_request) Successful in 3m36s
[#1427] object: Fix Put for EC object when node unavailable
There might be situation when context canceled earlier than traverser move to another part of the nodes.
To avoid this, need to wait for the result from concurrent put at each traverser iteration.

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-11-12 12:55:31 +03:00

357 lines
9.1 KiB
Go

package putsvc
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var _ transformer.ObjectWriter = (*ecWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
type ecWriter struct {
cfg *cfg
placementOpts []placement.Option
container containerSDK.Container
key *ecdsa.PrivateKey
commonPrm *svcutil.CommonPrm
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
objMeta object.ContentMeta
objMetaValid bool
}
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil {
return err
}
if relayed {
return nil
}
if !object.IsECSupported(obj) {
// must be resolved by caller
return errUnsupportedECObject
}
if !e.objMetaValid {
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
}
e.objMetaValid = true
}
if obj.ECHeader() != nil {
return e.writeECPart(ctx, obj)
}
return e.writeRawObject(ctx, obj)
}
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.relay == nil {
return false, nil
}
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
if err != nil {
return false, err
}
if currentNodeIsContainerNode {
// object can be splitted or saved local
return false, nil
}
objID := object.AddressOf(obj).Object()
var index uint32
if obj.ECHeader() != nil {
objID = obj.ECHeader().Parent()
index = obj.ECHeader().Index()
}
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err
}
return true, nil
}
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return false, err
}
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for _, node := range nodes {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
return true, nil
}
}
}
return false, nil
}
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
var lastErr error
offset := int(index)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for idx := range nodes {
node := nodes[(idx+offset)%len(nodes)]
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
c, err := e.cfg.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
defer close(completed)
err = e.relay(ctx, info, c)
}); poolErr != nil {
close(completed)
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
return poolErr
}
<-completed
if err == nil {
return nil
}
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
lastErr = err
}
}
if lastErr == nil {
return nil
}
return errIncompletePut{
singleErr: lastErr,
}
}
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.commonPrm.LocalOnly() {
return e.writePartLocal(ctx, obj)
}
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
eg.Go(func() error {
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes, make([]atomic.Bool, len(nodes)))
})
t.SubmitSuccess()
}
if err := eg.Wait(); err != nil {
return errIncompletePut{
singleErr: err,
}
}
return nil
}
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
// now only single EC policy is supported
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
if err != nil {
return err
}
parts, err := c.Split(obj, e.key)
if err != nil {
return err
}
partsProcessed := make([]atomic.Bool, len(parts))
objID, _ := obj.ID()
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
for {
eg, egCtx := errgroup.WithContext(ctx)
nodes := t.Next()
if len(nodes) == 0 {
break
}
visited := make([]atomic.Bool, len(nodes))
for idx := range parts {
visited[idx%len(nodes)].Store(true)
}
for idx := range parts {
if !partsProcessed[idx].Load() {
eg.Go(func() error {
err := e.writePart(egCtx, parts[idx], idx, nodes, visited)
if err == nil {
partsProcessed[idx].Store(true)
t.SubmitSuccess()
}
return err
})
}
}
err = eg.Wait()
}
if err != nil {
return errIncompletePut{
singleErr: err,
}
}
return nil
}
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// try to save to node for current part index
node := nodes[partIdx%len(nodes)]
err := e.putECPartToNode(ctx, obj, node)
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
partVisited := make([]bool, len(nodes))
partVisited[partIdx%len(nodes)] = true
// try to save to any node not visited by any of other parts
for i := 1; i < len(nodes); i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
idx := (partIdx + i) % len(nodes)
if !visited[idx].CompareAndSwap(false, true) {
continue
}
node = nodes[idx]
err := e.putECPartToNode(ctx, obj, node)
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
partVisited[idx] = true
}
// try to save to any node not visited by current part
for i := 0; i < len(nodes); i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if partVisited[i] {
continue
}
node = nodes[i]
err := e.putECPartToNode(ctx, obj, node)
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
}
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
}
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
return e.writePartLocal(ctx, obj)
}
return e.writePartRemote(ctx, obj, node)
}
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error
localTarget := localTarget{
storage: e.cfg.localStore,
}
completed := make(chan interface{})
if poolErr := e.cfg.localPool.Submit(func() {
defer close(completed)
err = localTarget.WriteObject(ctx, obj, e.objMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
}
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
var clientNodeInfo client.NodeInfo
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteTarget{
privateKey: e.key,
clientConstructor: e.cfg.clientConstructor,
commonPrm: e.commonPrm,
nodeInfo: clientNodeInfo,
}
var err error
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
defer close(completed)
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
}