From 1c5e0f90aabaee60f2275b7fb3fead8bedabc019 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 28 Mar 2024 13:46:19 +0300 Subject: [PATCH] [#1064] putsvc: Add EC put Signed-off-by: Dmitrii Stepanov --- go.mod | 1 + go.sum | Bin 41962 -> 42147 bytes internal/logs/logs.go | 2 + pkg/core/container/ec.go | 11 + pkg/core/object/ec.go | 13 + pkg/core/policy/ec.go | 20 ++ pkg/services/object/put/ec.go | 265 ++++++++++++++++++ pkg/services/object/put/single.go | 88 +++++- pkg/services/object/put/streamer.go | 48 +++- pkg/services/object/put/writer.go | 23 ++ .../object_manager/placement/traverser.go | 2 +- 11 files changed, 450 insertions(+), 23 deletions(-) create mode 100644 pkg/core/container/ec.go create mode 100644 pkg/core/object/ec.go create mode 100644 pkg/core/policy/ec.go create mode 100644 pkg/services/object/put/ec.go create mode 100644 pkg/services/object/put/writer.go diff --git a/go.mod b/go.mod index 2544f0fb9..f940b306a 100644 --- a/go.mod +++ b/go.mod @@ -85,6 +85,7 @@ require ( github.com/ipfs/go-cid v0.4.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect + github.com/klauspost/reedsolomon v1.12.1 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect diff --git a/go.sum b/go.sum index 8dbaa53738ab0ab85ffdd0bef14511040a473400..7f7e8439863316f1147df6c6c1c65773ea1569d6 100644 GIT binary patch delta 167 zcmaELoN4h%rVVq9CifX~Iu)g+rWEJr$XR0sf7L 0 +} + +// ECDataCount returns EC data count for EC placement policy. +func ECDataCount(policy netmapSDK.PlacementPolicy) int { + return int(policy.ReplicaDescriptor(0).GetECDataCount()) +} + +// ECParityCount returns EC parity count for EC placement policy. +func ECParityCount(policy netmapSDK.PlacementPolicy) int { + return int(policy.ReplicaDescriptor(0).GetECParityCount()) +} diff --git a/pkg/services/object/put/ec.go b/pkg/services/object/put/ec.go new file mode 100644 index 000000000..74db3c31f --- /dev/null +++ b/pkg/services/object/put/ec.go @@ -0,0 +1,265 @@ +package putsvc + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +var _ transformer.ObjectWriter = (*ecWriter)(nil) + +var errUnsupportedECObject = errors.New("object is not supported for erasure coding") + +type ecWriter struct { + cfg *cfg + placementOpts []placement.Option + container containerSDK.Container + key *ecdsa.PrivateKey + commonPrm *svcutil.CommonPrm + relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error + + objMeta object.ContentMeta + objMetaValid bool +} + +func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { + relayed, err := e.relayIfNotContainerNode(ctx) + if err != nil { + return err + } + if relayed { + return nil + } + + if !object.IsECSupported(obj) { + // must be resolved by caller + return errUnsupportedECObject + } + + if !e.objMetaValid { + if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil { + return fmt.Errorf("(%T) could not validate payload content: %w", e, err) + } + e.objMetaValid = true + } + + if obj.ECHeader() != nil { + return e.writeECPart(ctx, obj) + } + return e.writeRawObject(ctx, obj) +} + +func (e *ecWriter) relayIfNotContainerNode(ctx context.Context) (bool, error) { + if e.relay == nil { + return false, nil + } + currentNodeIsContainerNode, err := e.currentNodeIsContainerNode() + if err != nil { + return false, err + } + if currentNodeIsContainerNode { + // object can be splitted or saved local + return false, nil + } + if err := e.relayToContainerNode(ctx); err != nil { + return false, err + } + return true, nil +} + +func (e *ecWriter) currentNodeIsContainerNode() (bool, error) { + t, err := placement.NewTraverser(e.placementOpts...) + if err != nil { + return false, err + } + for { + nodes := t.Next() + if len(nodes) == 0 { + break + } + for _, node := range nodes { + if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { + return true, nil + } + } + } + return false, nil +} + +func (e *ecWriter) relayToContainerNode(ctx context.Context) error { + t, err := placement.NewTraverser(e.placementOpts...) + if err != nil { + return err + } + var lastErr error + for { + nodes := t.Next() + if len(nodes) == 0 { + break + } + for _, node := range nodes { + var info client.NodeInfo + client.NodeInfoFromNetmapElement(&info, node) + + c, err := e.cfg.clientConstructor.Get(info) + if err != nil { + return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) + } + + completed := make(chan interface{}) + if poolErr := e.cfg.remotePool.Submit(func() { + defer close(completed) + err = e.relay(ctx, info, c) + }); poolErr != nil { + close(completed) + svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr) + return poolErr + } + <-completed + + if err == nil { + return nil + } + e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup())) + lastErr = err + } + } + if lastErr == nil { + return nil + } + return errIncompletePut{ + singleErr: lastErr, + } +} + +func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error { + t, err := placement.NewTraverser(e.placementOpts...) + if err != nil { + return err + } + + eg, egCtx := errgroup.WithContext(ctx) + for { + nodes := t.Next() + if len(nodes) == 0 { + break + } + + eg.Go(func() error { + return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes) + }) + } + if err := eg.Wait(); err != nil { + return errIncompletePut{ + singleErr: err, + } + } + return nil +} + +func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error { + // now only single EC policy is supported + c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy())) + if err != nil { + return err + } + parts, err := c.Split(obj, e.key) + if err != nil { + return err + } + t, err := placement.NewTraverser(e.placementOpts...) + if err != nil { + return err + } + + eg, egCtx := errgroup.WithContext(ctx) + for { + nodes := t.Next() + if len(nodes) == 0 { + break + } + + for idx := range parts { + idx := idx + eg.Go(func() error { + return e.writePart(egCtx, parts[idx], idx, nodes) + }) + } + } + if err := eg.Wait(); err != nil { + return errIncompletePut{ + singleErr: err, + } + } + return nil +} + +func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node) error { + var err error + node := nodes[partIdx%len(nodes)] + if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { + err = e.writePartLocal(ctx, obj) + } else { + err = e.writePartRemote(ctx, obj, node) + } + if err == nil { + return nil + } + e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("parent_object", object.AddressOf(obj)), zap.Error(err)) + return err +} + +func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { + var err error + localTarget := localTarget{ + storage: e.cfg.localStore, + } + completed := make(chan interface{}) + if poolErr := e.cfg.localPool.Submit(func() { + defer close(completed) + err = localTarget.WriteObject(ctx, obj, e.objMeta) + }); poolErr != nil { + close(completed) + return poolErr + } + <-completed + return err +} + +func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { + var clientNodeInfo client.NodeInfo + client.NodeInfoFromNetmapElement(&clientNodeInfo, node) + + remoteTaget := remoteTarget{ + privateKey: e.key, + clientConstructor: e.cfg.clientConstructor, + commonPrm: e.commonPrm, + nodeInfo: clientNodeInfo, + } + + var err error + completed := make(chan interface{}) + if poolErr := e.cfg.remotePool.Submit(func() { + defer close(completed) + err = remoteTaget.WriteObject(ctx, obj, e.objMeta) + }); poolErr != nil { + close(completed) + return poolErr + } + <-completed + return err +} diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index 09e1eb092..43b3b0ac1 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -16,6 +16,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" @@ -25,6 +26,7 @@ import ( tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/tzhash/tz" "go.opentelemetry.io/otel/attribute" @@ -32,7 +34,10 @@ import ( "go.uber.org/zap" ) -var errInvalidPayloadChecksum = errors.New("incorrect payload checksum") +var ( + errInvalidPayloadChecksum = errors.New("incorrect payload checksum") + errInvalidECObject = errors.New("object must be splitted to EC parts") +) type putSingleRequestSigner struct { req *objectAPI.PutSingleRequest @@ -148,12 +153,20 @@ func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Ob func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { localOnly := req.GetMetaHeader().GetTTL() <= 1 - placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly) + placement, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly) if err != nil { return err } - iter := s.cfg.newNodeIterator(placementOptions) + if placement.isEC { + return s.saveToECReplicas(ctx, placement, obj, req, meta) + } + + return s.saveToREPReplicas(ctx, placement, obj, localOnly, req, meta) +} + +func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { + iter := s.cfg.newNodeIterator(placement.placementOptions) iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly) signer := &putSingleRequestSigner{ @@ -167,38 +180,83 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o }) } -func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) { - var result []placement.Option - if len(copiesNumber) > 0 { - result = append(result, placement.WithCopyNumbers(copiesNumber)) +func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { + if obj.Type() == objectSDK.TypeRegular && obj.ECHeader() == nil { + return errInvalidECObject } + commonPrm, err := svcutil.CommonPrmFromV2(req) + if err != nil { + return err + } + key, err := s.cfg.keyStorage.GetKey(nil) + if err != nil { + return err + } + signer := &putSingleRequestSigner{ + req: req, + keyStorage: s.keyStorage, + signer: &sync.Once{}, + } + + w := ecWriter{ + cfg: s.cfg, + placementOpts: placement.placementOptions, + objMeta: meta, + objMetaValid: true, + commonPrm: commonPrm, + container: placement.container, + key: key, + relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error { + return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac) + }, + } + return w.WriteObject(ctx, obj) +} + +type putSinglePlacement struct { + placementOptions []placement.Option + isEC bool + container containerSDK.Container +} + +func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) { + var result putSinglePlacement + cnrID, ok := obj.ContainerID() if !ok { - return nil, errors.New("missing container ID") + return result, errors.New("missing container ID") } cnrInfo, err := s.cnrSrc.Get(cnrID) if err != nil { - return nil, fmt.Errorf("could not get container by ID: %w", err) + return result, fmt.Errorf("could not get container by ID: %w", err) } - result = append(result, placement.ForContainer(cnrInfo.Value)) + result.container = cnrInfo.Value + result.isEC = container.IsECContainer(cnrInfo.Value) && object.IsECSupported(obj) + if len(copiesNumber) > 0 && !result.isEC { + result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber)) + } + result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value)) objID, ok := obj.ID() if !ok { - return nil, errors.New("missing object ID") + return result, errors.New("missing object ID") } - result = append(result, placement.ForObject(objID)) + if obj.ECHeader() != nil { + objID = obj.ECHeader().Parent() + } + result.placementOptions = append(result.placementOptions, placement.ForObject(objID)) latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc) if err != nil { - return nil, fmt.Errorf("could not get latest network map: %w", err) + return result, fmt.Errorf("could not get latest network map: %w", err) } builder := placement.NewNetworkMapBuilder(latestNetmap) if localOnly { - result = append(result, placement.SuccessAfter(1)) + result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1)) builder = svcutil.NewLocalPlacement(builder, s.netmapKeys) } - result = append(result, placement.UseBuilder(builder)) + result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder)) return result, nil } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index f32b2ab99..14dae38d5 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -7,7 +7,9 @@ import ( "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" @@ -19,7 +21,7 @@ import ( type Streamer struct { *cfg - sessionKey *ecdsa.PrivateKey + privateKey *ecdsa.PrivateKey target transformer.ChunkedObjectWriter @@ -76,6 +78,12 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error { p.relay = prm.relay + nodeKey, err := p.cfg.keyStorage.GetKey(nil) + if err != nil { + return err + } + p.privateKey = nodeKey + // prepare untrusted-Put object target p.target = &validatingPreparedTarget{ nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)), @@ -102,7 +110,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { } } - sessionKey, err := p.keyStorage.GetKey(sessionInfo) + key, err := p.keyStorage.GetKey(sessionInfo) if err != nil { return fmt.Errorf("(%T) could not receive session key: %w", p, err) } @@ -116,7 +124,7 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { if sToken == nil { var ownerSession user.ID - user.IDFromKey(&ownerSession, sessionKey.PublicKey) + user.IDFromKey(&ownerSession, key.PublicKey) if !ownerObj.Equals(ownerSession) { return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p) @@ -127,11 +135,11 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { } } - p.sessionKey = sessionKey + p.privateKey = key p.target = &validatingTarget{ fmt: p.fmtValidator, nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ - Key: sessionKey, + Key: key, NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) }, NetworkState: p.networkState, MaxSize: p.maxPayloadSz, @@ -171,7 +179,12 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { placement.ForContainer(prm.cnr), ) - if id, ok := prm.hdr.ID(); ok { + if ech := prm.hdr.ECHeader(); ech != nil { + prm.traverseOpts = append(prm.traverseOpts, + // set identifier of the processing object + placement.ForObject(ech.Parent()), + ) + } else if id, ok := prm.hdr.ID(); ok { prm.traverseOpts = append(prm.traverseOpts, // set identifier of the processing object placement.ForObject(id), @@ -196,6 +209,13 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { } func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { + if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) { + return p.newECWriter(prm) + } + return p.newDefaultObjectWriter(prm) +} + +func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { var relay func(context.Context, nodeDesc) error if p.relay != nil { relay = func(ctx context.Context, node nodeDesc) error { @@ -223,7 +243,7 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { } rt := &remoteTarget{ - privateKey: p.sessionKey, + privateKey: p.privateKey, commonPrm: prm.common, clientConstructor: p.clientConstructor, } @@ -236,6 +256,20 @@ func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { } } +func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter { + return &objectWriterDispatcher{ + ecWriter: &ecWriter{ + cfg: p.cfg, + placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC + container: prm.cnr, + key: p.privateKey, + commonPrm: prm.common, + relay: p.relay, + }, + repWriter: p.newDefaultObjectWriter(prm), + } +} + func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error { if p.target == nil { return errNotInit diff --git a/pkg/services/object/put/writer.go b/pkg/services/object/put/writer.go new file mode 100644 index 000000000..53eee6006 --- /dev/null +++ b/pkg/services/object/put/writer.go @@ -0,0 +1,23 @@ +package putsvc + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" +) + +var _ transformer.ObjectWriter = (*objectWriterDispatcher)(nil) + +type objectWriterDispatcher struct { + ecWriter transformer.ObjectWriter + repWriter transformer.ObjectWriter +} + +func (m *objectWriterDispatcher) WriteObject(ctx context.Context, obj *objectSDK.Object) error { + if object.IsECSupported(obj) { + return m.ecWriter.WriteObject(ctx, obj) + } + return m.repWriter.WriteObject(ctx, obj) +} diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go index a699b4454..306169571 100644 --- a/pkg/services/object_manager/placement/traverser.go +++ b/pkg/services/object_manager/placement/traverser.go @@ -137,7 +137,7 @@ func defaultCopiesVector(policy netmap.PlacementPolicy) []int { copyVector := make([]int, 0, replNum) for i := 0; i < replNum; i++ { - copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects())) + copyVector = append(copyVector, int(policy.ReplicaDescriptor(i).NumberOfObjects()+policy.ReplicaDescriptor(i).GetECDataCount()+policy.ReplicaDescriptor(i).GetECParityCount())) } return copyVector