package putsvc import ( "context" "crypto/ecdsa" "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) var _ transformer.ObjectWriter = (*ecWriter)(nil) var errUnsupportedECObject = errors.New("object is not supported for erasure coding") type ecWriter struct { cfg *cfg placementOpts []placement.Option container containerSDK.Container key *ecdsa.PrivateKey commonPrm *svcutil.CommonPrm relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error objMeta object.ContentMeta objMetaValid bool } func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { relayed, err := e.relayIfNotContainerNode(ctx) if err != nil { return err } if relayed { return nil } if !object.IsECSupported(obj) { // must be resolved by caller return errUnsupportedECObject } if !e.objMetaValid { if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil { return fmt.Errorf("(%T) could not validate payload content: %w", e, err) } e.objMetaValid = true } if obj.ECHeader() != nil { return e.writeECPart(ctx, obj) } return e.writeRawObject(ctx, obj) } func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) { if e.relay == nil { return false, nil } currentNodeIsContainerNode, err := e.currentNodeIsContainerNode() if err != nil { return false, err } if currentNodeIsContainerNode { // object can be splitted or saved local return false, nil } if err := e.relayToContainerNode(ctx); err != nil { return false, err } return true, nil } func (e *ecWriter) currentNodeIsContainerNode() (bool, error) { t, err := placement.NewTraverser(e.placementOpts...) if err != nil { return false, err } for { nodes := t.Next() if len(nodes) == 0 { break } for _, node := range nodes { if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { return true, nil } } } return false, nil } func (e *ecWriter) relayToContainerNode(ctx context.Context) error { t, err := placement.NewTraverser(e.placementOpts...) if err != nil { return err } var lastErr error for { nodes := t.Next() if len(nodes) == 0 { break } for _, node := range nodes { var info client.NodeInfo client.NodeInfoFromNetmapElement(&info, node) c, err := e.cfg.clientConstructor.Get(info) if err != nil { return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } completed := make(chan interface{}) if poolErr := e.cfg.remotePool.Submit(func() { defer close(completed) err = e.relay(ctx, info, c) }); poolErr != nil { close(completed) svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr) return poolErr } <-completed if err == nil { return nil } e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup())) lastErr = err } } if lastErr == nil { return nil } return errIncompletePut{ singleErr: lastErr, } } func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error { t, err := placement.NewTraverser(e.placementOpts...) if err != nil { return err } eg, egCtx := errgroup.WithContext(ctx) for { nodes := t.Next() if len(nodes) == 0 { break } eg.Go(func() error { return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes) }) } if err := eg.Wait(); err != nil { return errIncompletePut{ singleErr: err, } } return nil } func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error { // now only single EC policy is supported c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy())) if err != nil { return err } parts, err := c.Split(obj, e.key) if err != nil { return err } t, err := placement.NewTraverser(e.placementOpts...) if err != nil { return err } eg, egCtx := errgroup.WithContext(ctx) for { nodes := t.Next() if len(nodes) == 0 { break } for idx := range parts { idx := idx eg.Go(func() error { return e.writePart(egCtx, parts[idx], idx, nodes) }) } } if err := eg.Wait(); err != nil { return errIncompletePut{ singleErr: err, } } return nil } func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error { var err error node := nodes[partIdx%len(nodes)] if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { err = e.writePartLocal(ctx, obj) } else { err = e.writePartRemote(ctx, obj, node) } if err == nil { return nil } e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err)) return err } func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { var err error localTarget := localTarget{ storage: e.cfg.localStore, } completed := make(chan interface{}) if poolErr := e.cfg.localPool.Submit(func() { defer close(completed) err = localTarget.WriteObject(ctx, obj, e.objMeta) }); poolErr != nil { close(completed) return poolErr } <-completed return err } func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { var clientNodeInfo client.NodeInfo client.NodeInfoFromNetmapElement(&clientNodeInfo, node) remoteTaget := remoteTarget{ privateKey: e.key, clientConstructor: e.cfg.clientConstructor, commonPrm: e.commonPrm, nodeInfo: clientNodeInfo, } var err error completed := make(chan interface{}) if poolErr := e.cfg.remotePool.Submit(func() { defer close(completed) err = remoteTaget.WriteObject(ctx, obj, e.objMeta) }); poolErr != nil { close(completed) return poolErr } <-completed return err }