package putsvc import ( "context" "crypto/ecdsa" "encoding/hex" "errors" "fmt" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) var _ transformer.ObjectWriter = (*ecWriter)(nil) var errUnsupportedECObject = errors.New("object is not supported for erasure coding") type ecWriter struct { cfg *cfg placementOpts []placement.Option container containerSDK.Container key *ecdsa.PrivateKey commonPrm *svcutil.CommonPrm relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error objMeta object.ContentMeta objMetaValid bool } func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { relayed, err := e.relayIfNotContainerNode(ctx, obj) if err != nil { return err } if relayed { return nil } if !object.IsECSupported(obj) { // must be resolved by caller return errUnsupportedECObject } if !e.objMetaValid { if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil { return fmt.Errorf("(%T) could not validate payload content: %w", e, err) } e.objMetaValid = true } if obj.ECHeader() != nil { return e.writeECPart(ctx, obj) } return e.writeRawObject(ctx, obj) } func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) { if e.relay == nil { return false, nil } currentNodeIsContainerNode, err := e.currentNodeIsContainerNode() if err != nil { return false, err } if currentNodeIsContainerNode { // object can be splitted or saved local return false, nil } objID := object.AddressOf(obj).Object() var index uint32 if obj.ECHeader() != nil { objID = obj.ECHeader().Parent() index = obj.ECHeader().Index() } if err := e.relayToContainerNode(ctx, objID, index); err != nil { return false, err } return true, nil } func (e *ecWriter) currentNodeIsContainerNode() (bool, error) { t, err := placement.NewTraverser(e.placementOpts...) if err != nil { return false, err } for { nodes := t.Next() if len(nodes) == 0 { break } for _, node := range nodes { if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { return true, nil } } } return false, nil } func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error { t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...) if err != nil { return err } var lastErr error offset := int(index) for { nodes := t.Next() if len(nodes) == 0 { break } for idx := range nodes { node := nodes[(idx+offset)%len(nodes)] var info client.NodeInfo client.NodeInfoFromNetmapElement(&info, node) c, err := e.cfg.clientConstructor.Get(info) if err != nil { return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } completed := make(chan interface{}) if poolErr := e.cfg.remotePool.Submit(func() { defer close(completed) err = e.relay(ctx, info, c) }); poolErr != nil { close(completed) svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr) return poolErr } <-completed if err == nil { return nil } e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup())) lastErr = err } } if lastErr == nil { return nil } return errIncompletePut{ singleErr: lastErr, } } func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error { if e.commonPrm.LocalOnly() { return e.writePartLocal(ctx, obj) } t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...) if err != nil { return err } eg, egCtx := errgroup.WithContext(ctx) for { nodes := t.Next() if len(nodes) == 0 { break } eg.Go(func() error { return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes, make([]atomic.Bool, len(nodes))) }) t.SubmitSuccess() } if err := eg.Wait(); err != nil { return errIncompletePut{ singleErr: err, } } return nil } func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error { // now only single EC policy is supported c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy())) if err != nil { return err } parts, err := c.Split(obj, e.key) if err != nil { return err } objID, _ := obj.ID() t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...) if err != nil { return err } eg, egCtx := errgroup.WithContext(ctx) for { nodes := t.Next() if len(nodes) == 0 { break } visited := make([]atomic.Bool, len(nodes)) for idx := range parts { visited[idx%len(nodes)].Store(true) } for idx := range parts { eg.Go(func() error { return e.writePart(egCtx, parts[idx], idx, nodes, visited) }) t.SubmitSuccess() } } if err := eg.Wait(); err != nil { return errIncompletePut{ singleErr: err, } } return nil } func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error { select { case <-ctx.Done(): return ctx.Err() default: } // try to save to node for current part index node := nodes[partIdx%len(nodes)] err := e.putECPartToNode(ctx, obj, node) if err == nil { return nil } e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) partVisited := make([]bool, len(nodes)) partVisited[partIdx%len(nodes)] = true // try to save to any node not visited by any of other parts for i := 1; i < len(nodes); i++ { select { case <-ctx.Done(): return ctx.Err() default: } idx := (partIdx + i) % len(nodes) if !visited[idx].CompareAndSwap(false, true) { continue } node = nodes[idx] err := e.putECPartToNode(ctx, obj, node) if err == nil { return nil } e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) partVisited[idx] = true } // try to save to any node not visited by current part for i := 0; i < len(nodes); i++ { select { case <-ctx.Done(): return ctx.Err() default: } if partVisited[i] { continue } node = nodes[i] err := e.putECPartToNode(ctx, obj, node) if err == nil { return nil } e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) } return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj)) } func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { return e.writePartLocal(ctx, obj) } return e.writePartRemote(ctx, obj, node) } func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { var err error localTarget := localTarget{ storage: e.cfg.localStore, } completed := make(chan interface{}) if poolErr := e.cfg.localPool.Submit(func() { defer close(completed) err = localTarget.WriteObject(ctx, obj, e.objMeta) }); poolErr != nil { close(completed) return poolErr } <-completed return err } func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { var clientNodeInfo client.NodeInfo client.NodeInfoFromNetmapElement(&clientNodeInfo, node) remoteTaget := remoteTarget{ privateKey: e.key, clientConstructor: e.cfg.clientConstructor, commonPrm: e.commonPrm, nodeInfo: clientNodeInfo, } var err error completed := make(chan interface{}) if poolErr := e.cfg.remotePool.Submit(func() { defer close(completed) err = remoteTaget.WriteObject(ctx, obj, e.objMeta) }); poolErr != nil { close(completed) return poolErr } <-completed return err }