From 05ac9e36375ffeae0818a4eae2ff2c931aa759bc Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 26 Jul 2023 16:01:02 +0300 Subject: [PATCH] [#547] objectsvc: Work with `traversal` struct from a single thread Signed-off-by: Evgenii Stratonikov --- pkg/services/object/put/distributed.go | 20 ++++++-------------- pkg/services/object/put/single.go | 5 ++--- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index cf5cc558..f4afd44a 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -47,9 +47,6 @@ type traversal struct { // need of additional broadcast after the object is saved extraBroadcastEnabled bool - // mtx protects mExclude map. - mtx sync.RWMutex - // container nodes which was processed during the primary object placement mExclude map[string]struct{} } @@ -75,21 +72,17 @@ func (x *traversal) submitProcessed(n placement.Node) { if x.extraBroadcastEnabled { key := string(n.PublicKey()) - x.mtx.Lock() if x.mExclude == nil { x.mExclude = make(map[string]struct{}, 1) } x.mExclude[key] = struct{}{} - x.mtx.Unlock() } } // checks if specified node was processed during the primary object placement. func (x *traversal) processed(n placement.Node) bool { - x.mtx.RLock() _, ok := x.mExclude[string(n.PublicKey())] - x.mtx.RUnlock() return ok } @@ -235,13 +228,6 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla defer wg.Done() err := t.sendObject(ctx, nodeDesc{local: isLocal, info: addr}) - - // mark the container node as processed in order to exclude it - // in subsequent container broadcast. Note that we don't - // process this node during broadcast if primary placement - // on it failed. - t.traversal.submitProcessed(addr) - if err != nil { resErr.Store(err) svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) @@ -254,6 +240,12 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla svcutil.LogWorkerPoolError(t.log, "PUT", err) return true } + + // mark the container node as processed in order to exclude it + // in subsequent container broadcast. Note that we don't + // process this node during broadcast if primary placement + // on it failed. + t.traversal.submitProcessed(addr) } wg.Wait() diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index b5a88c50..9d17a812 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -154,7 +154,6 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o opts: placementOptions, extraBroadcastEnabled: len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)), - mtx: sync.RWMutex{}, mExclude: make(map[string]struct{}), } signer := &putSingleRequestSigner{ @@ -265,8 +264,6 @@ func (s *Service) saveToPlacementNodes(ctx context.Context, err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer, meta) - traversal.submitProcessed(nodeAddress) - if err != nil { resultError.Store(err) svcutil.LogServiceError(s.log, "PUT", nodeAddress.Addresses(), err) @@ -279,6 +276,8 @@ func (s *Service) saveToPlacementNodes(ctx context.Context, svcutil.LogWorkerPoolError(s.log, "PUT", err) return true } + + traversal.submitProcessed(nodeAddress) } wg.Wait()