frostfs-node/pkg/services/object/put/ec.go

350 lines
8.9 KiB
Go
Raw Normal View History

package putsvc
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var _ transformer.ObjectWriter = (*ecWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
type ecWriter struct {
cfg *cfg
placementOpts []placement.Option
container containerSDK.Container
key *ecdsa.PrivateKey
commonPrm *svcutil.CommonPrm
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
objMeta object.ContentMeta
objMetaValid bool
}
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil {
return err
}
if relayed {
return nil
}
if !object.IsECSupported(obj) {
// must be resolved by caller
return errUnsupportedECObject
}
if !e.objMetaValid {
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
}
e.objMetaValid = true
}
if obj.ECHeader() != nil {
return e.writeECPart(ctx, obj)
}
return e.writeRawObject(ctx, obj)
}
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.relay == nil {
return false, nil
}
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
if err != nil {
return false, err
}
if currentNodeIsContainerNode {
// object can be splitted or saved local
return false, nil
}
objID := object.AddressOf(obj).Object()
var index uint32
if obj.ECHeader() != nil {
objID = obj.ECHeader().Parent()
index = obj.ECHeader().Index()
}
if err := e.relayToContainerNode(ctx, objID, index); err != nil {
return false, err
}
return true, nil
}
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.placementOpts...)
if err != nil {
return false, err
}
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for _, node := range nodes {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
return true, nil
}
}
}
return false, nil
}
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
var lastErr error
offset := int(index)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
for idx := range nodes {
node := nodes[(idx+offset)%len(nodes)]
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
c, err := e.cfg.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
defer close(completed)
err = e.relay(ctx, info, c)
}); poolErr != nil {
close(completed)
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
return poolErr
}
<-completed
if err == nil {
return nil
}
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
lastErr = err
}
}
if lastErr == nil {
return nil
}
return errIncompletePut{
singleErr: lastErr,
}
}
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.commonPrm.LocalOnly() {
return e.writePartLocal(ctx, obj)
}
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
eg.Go(func() error {
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes, make([]atomic.Bool, len(nodes)))
})
t.SubmitSuccess()
}
if err := eg.Wait(); err != nil {
return errIncompletePut{
singleErr: err,
}
}
return nil
}
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
// now only single EC policy is supported
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
if err != nil {
return err
}
parts, err := c.Split(obj, e.key)
if err != nil {
return err
}
objID, _ := obj.ID()
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
for {
nodes := t.Next()
if len(nodes) == 0 {
break
}
visited := make([]atomic.Bool, len(nodes))
for idx := range parts {
visited[idx%len(nodes)].Store(true)
}
for idx := range parts {
eg.Go(func() error {
return e.writePart(egCtx, parts[idx], idx, nodes, visited)
})
t.SubmitSuccess()
}
}
if err := eg.Wait(); err != nil {
return errIncompletePut{
singleErr: err,
}
}
return nil
}
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// try to save to node for current part index
node := nodes[partIdx%len(nodes)]
err := e.putECPartToNode(ctx, obj, node)
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
partVisited := make([]bool, len(nodes))
partVisited[partIdx%len(nodes)] = true
// try to save to any node not visited by any of other parts
for i := 1; i < len(nodes); i++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
idx := (partIdx + i) % len(nodes)
if !visited[idx].CompareAndSwap(false, true) {
continue
}
node = nodes[idx]
err := e.putECPartToNode(ctx, obj, node)
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
partVisited[idx] = true
}
// try to save to any node not visited by current part
for i := range len(nodes) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if partVisited[i] {
continue
}
node = nodes[i]
err := e.putECPartToNode(ctx, obj, node)
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
}
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
}
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
return e.writePartLocal(ctx, obj)
}
return e.writePartRemote(ctx, obj, node)
}
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error
localTarget := localTarget{
storage: e.cfg.localStore,
}
completed := make(chan interface{})
if poolErr := e.cfg.localPool.Submit(func() {
defer close(completed)
err = localTarget.WriteObject(ctx, obj, e.objMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
}
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
var clientNodeInfo client.NodeInfo
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteTarget{
privateKey: e.key,
clientConstructor: e.cfg.clientConstructor,
commonPrm: e.commonPrm,
nodeInfo: clientNodeInfo,
}
var err error
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
defer close(completed)
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
}); poolErr != nil {
close(completed)
return poolErr
}
<-completed
return err
}