diff --git a/pkg/services/object/put/common.go b/pkg/services/object/put/common.go new file mode 100644 index 00000000..97e127e1 --- /dev/null +++ b/pkg/services/object/put/common.go @@ -0,0 +1,114 @@ +package putsvc + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "go.uber.org/zap" +) + +type nodeIterator struct { + traversal + cfg *cfg +} + +func (c *cfg) newNodeIterator(opts []placement.Option) *nodeIterator { + return &nodeIterator{ + traversal: traversal{ + opts: opts, + mExclude: make(map[string]*bool), + }, + cfg: c, + } +} + +func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, nodeDesc) error) error { + traverser, err := placement.NewTraverser(n.traversal.opts...) + if err != nil { + return fmt.Errorf("could not create object placement traverser: %w", err) + } + + resErr := &atomic.Value{} + + // Must iterate over all replicas, regardless of whether there are identical nodes there. + // At the same time need to exclude identical nodes from processing. + for { + addrs := traverser.Next() + if len(addrs) == 0 { + break + } + + if n.forEachAddress(ctx, traverser, addrs, f, resErr) { + break + } + } + + if !traverser.Success() { + var err errIncompletePut + err.singleErr, _ = resErr.Load().(error) + return err + } + + // perform additional container broadcast if needed + if n.traversal.submitPrimaryPlacementFinish() { + err := n.forEachNode(ctx, f) + if err != nil { + n.cfg.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err)) + // we don't fail primary operation because of broadcast failure + } + } + + return nil +} + +func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, nodeDesc) error, resErr *atomic.Value) bool { + var wg sync.WaitGroup + + for _, addr := range addrs { + addr := addr + if ok := n.mExclude[string(addr.PublicKey())]; ok != nil { + if *ok { + traverser.SubmitSuccess() + } + // This can happen only during additional container broadcast. + continue + } + + workerPool, isLocal := n.cfg.getWorkerPool(addr.PublicKey()) + + item := new(bool) + wg.Add(1) + if err := workerPool.Submit(func() { + defer wg.Done() + + err := f(ctx, nodeDesc{local: isLocal, info: addr}) + if err != nil { + resErr.Store(err) + svcutil.LogServiceError(n.cfg.log, "PUT", addr.Addresses(), err) + return + } + + traverser.SubmitSuccess() + *item = true + }); err != nil { + wg.Done() + svcutil.LogWorkerPoolError(n.cfg.log, "PUT", err) + return true + } + + // Mark the container node as processed in order to exclude it + // in subsequent container broadcast. Note that we don't + // process this node during broadcast if primary placement + // on it failed. + n.traversal.submitProcessed(addr, item) + } + + wg.Wait() + + return false +} diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index df01be9b..577c5a15 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -3,16 +3,11 @@ package putsvc import ( "context" "fmt" - "sync" - "sync/atomic" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" - "go.uber.org/zap" ) type preparedObjectTarget interface { @@ -20,7 +15,8 @@ type preparedObjectTarget interface { } type distributedTarget struct { - traversal traversal + placementOpts []placement.Option + extraBroadcastEnabled bool obj *objectSDK.Object objMeta object.ContentMeta @@ -137,7 +133,7 @@ func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Obje if len(t.obj.Children()) > 0 { // enabling extra broadcast for linking objects - t.traversal.extraBroadcastEnabled = true + t.extraBroadcastEnabled = true } return t.iteratePlacement(ctx) @@ -160,90 +156,7 @@ func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error func (t *distributedTarget) iteratePlacement(ctx context.Context) error { id, _ := t.obj.ID() - traverser, err := placement.NewTraverser( - append(t.traversal.opts, placement.ForObject(id))..., - ) - if err != nil { - return fmt.Errorf("(%T) could not create object placement traverser: %w", t, err) - } - - resErr := &atomic.Value{} - - // Must iterate over all replicas, regardless of whether there are identical nodes there. - // At the same time need to exclude identical nodes from processing. - for { - addrs := traverser.Next() - if len(addrs) == 0 { - break - } - - if t.iterateAddresses(ctx, traverser, addrs, resErr) { - break - } - } - - if !traverser.Success() { - var err errIncompletePut - err.singleErr, _ = resErr.Load().(error) - return err - } - - // perform additional container broadcast if needed - if t.traversal.submitPrimaryPlacementFinish() { - err = t.iteratePlacement(ctx) - if err != nil { - t.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err)) - // we don't fail primary operation because of broadcast failure - } - } - - return nil -} - -func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, resErr *atomic.Value) bool { - wg := &sync.WaitGroup{} - - for i := range addrs { - addr := addrs[i] - if val := t.traversal.mExclude[string(addr.PublicKey())]; val != nil { - // Check is node processed successful on the previous iteration. - if *val { - traverser.SubmitSuccess() - } - // it can happen only during additional container broadcast - continue - } - - wg.Add(1) - item := new(bool) - - workerPool, isLocal := t.getWorkerPool(addr.PublicKey()) - if err := workerPool.Submit(func() { - defer wg.Done() - - err := t.sendObject(ctx, nodeDesc{local: isLocal, info: addr}) - if err != nil { - resErr.Store(err) - svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) - return - } - - traverser.SubmitSuccess() - *item = true - }); err != nil { - wg.Done() - svcutil.LogWorkerPoolError(t.log, "PUT", err) - return true - } - - // mark the container node as processed in order to exclude it - // in subsequent container broadcast. Note that we don't - // process this node during broadcast if primary placement - // on it failed. - t.traversal.submitProcessed(addr, item) - } - - wg.Wait() - - return false + iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id))) + iter.extraBroadcastEnabled = t.extraBroadcastEnabled + return iter.forEachNode(ctx, t.sendObject) } diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index eb18f0f4..8a7f192b 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -8,7 +8,6 @@ import ( "fmt" "hash" "sync" - "sync/atomic" objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" @@ -150,18 +149,19 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o if err != nil { return err } - traversal := &traversal{ - opts: placementOptions, - extraBroadcastEnabled: len(obj.Children()) > 0 || - (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)), - mExclude: make(map[string]*bool), - } + + iter := s.cfg.newNodeIterator(placementOptions) + iter.extraBroadcastEnabled = len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)) + signer := &putSingleRequestSigner{ req: req, keyStorage: s.keyStorage, signer: &sync.Once{}, } - return s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta) + + return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error { + return s.saveToPlacementNode(ctx, &nd, obj, signer, meta) + }) } func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) { @@ -199,97 +199,6 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb return result, nil } -func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner, - traversal *traversal, meta object.ContentMeta) error { - traverser, err := placement.NewTraverser(traversal.opts...) - if err != nil { - return fmt.Errorf("could not create object placement traverser: %w", err) - } - - var resultError atomic.Value - for { - addrs := traverser.Next() - if len(addrs) == 0 { - break - } - - if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, meta, &resultError); stop { - break - } - } - - if !traverser.Success() { - var err errIncompletePut - err.singleErr, _ = resultError.Load().(error) - return err - } - - if traversal.submitPrimaryPlacementFinish() { - err = s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta) - if err != nil { - s.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err)) - } - } - - return nil -} - -func (s *Service) saveToPlacementNodes(ctx context.Context, - obj *objectSDK.Object, - signer *putSingleRequestSigner, - traversal *traversal, - traverser *placement.Traverser, - nodeAddresses []placement.Node, - meta object.ContentMeta, - resultError *atomic.Value, -) bool { - wg := sync.WaitGroup{} - - for _, nodeAddress := range nodeAddresses { - nodeAddress := nodeAddress - if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil { - if *ok { - traverser.SubmitSuccess() - } - continue - } - - local := false - workerPool := s.remotePool - if s.netmapKeys.IsLocalKey(nodeAddress.PublicKey()) { - local = true - workerPool = s.localPool - } - - item := new(bool) - wg.Add(1) - if err := workerPool.Submit(func() { - defer wg.Done() - - err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer, meta) - - if err != nil { - resultError.Store(err) - svcutil.LogServiceError(s.log, "PUT", nodeAddress.Addresses(), err) - return - } - - traverser.SubmitSuccess() - *item = true - }); err != nil { - wg.Done() - svcutil.LogWorkerPoolError(s.log, "PUT", err) - return true - } - - traversal.submitProcessed(nodeAddress, item) - } - - wg.Wait() - - return false -} - func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object, signer *putSingleRequestSigner, meta object.ContentMeta) error { if nodeDesc.local { diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 1b5a926a..10f93284 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -215,13 +215,10 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget { withBroadcast := !prm.common.LocalOnly() && (typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLock) return &distributedTarget{ - cfg: p.cfg, - traversal: traversal{ - opts: prm.traverseOpts, - - extraBroadcastEnabled: withBroadcast, - }, - payload: getPayload(), + cfg: p.cfg, + placementOpts: prm.traverseOpts, + extraBroadcastEnabled: withBroadcast, + payload: getPayload(), nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { if node.local { return localTarget{