[#547] objectsvc: Work with traversal struct from a single thread

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-07-26 16:01:02 +03:00 committed by Evgenii Stratonikov
parent 7b0fdf0202
commit 05ac9e3637
2 changed files with 8 additions and 17 deletions

View file

@ -47,9 +47,6 @@ type traversal struct {
// need of additional broadcast after the object is saved // need of additional broadcast after the object is saved
extraBroadcastEnabled bool extraBroadcastEnabled bool
// mtx protects mExclude map.
mtx sync.RWMutex
// container nodes which was processed during the primary object placement // container nodes which was processed during the primary object placement
mExclude map[string]struct{} mExclude map[string]struct{}
} }
@ -75,21 +72,17 @@ func (x *traversal) submitProcessed(n placement.Node) {
if x.extraBroadcastEnabled { if x.extraBroadcastEnabled {
key := string(n.PublicKey()) key := string(n.PublicKey())
x.mtx.Lock()
if x.mExclude == nil { if x.mExclude == nil {
x.mExclude = make(map[string]struct{}, 1) x.mExclude = make(map[string]struct{}, 1)
} }
x.mExclude[key] = struct{}{} x.mExclude[key] = struct{}{}
x.mtx.Unlock()
} }
} }
// checks if specified node was processed during the primary object placement. // checks if specified node was processed during the primary object placement.
func (x *traversal) processed(n placement.Node) bool { func (x *traversal) processed(n placement.Node) bool {
x.mtx.RLock()
_, ok := x.mExclude[string(n.PublicKey())] _, ok := x.mExclude[string(n.PublicKey())]
x.mtx.RUnlock()
return ok return ok
} }
@ -235,13 +228,6 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
defer wg.Done() defer wg.Done()
err := t.sendObject(ctx, nodeDesc{local: isLocal, info: addr}) err := t.sendObject(ctx, nodeDesc{local: isLocal, info: addr})
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
if err != nil { if err != nil {
resErr.Store(err) resErr.Store(err)
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
@ -254,6 +240,12 @@ func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *pla
svcutil.LogWorkerPoolError(t.log, "PUT", err) svcutil.LogWorkerPoolError(t.log, "PUT", err)
return true return true
} }
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
} }
wg.Wait() wg.Wait()

View file

@ -154,7 +154,6 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
opts: placementOptions, opts: placementOptions,
extraBroadcastEnabled: len(obj.Children()) > 0 || extraBroadcastEnabled: len(obj.Children()) > 0 ||
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)), (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
mtx: sync.RWMutex{},
mExclude: make(map[string]struct{}), mExclude: make(map[string]struct{}),
} }
signer := &putSingleRequestSigner{ signer := &putSingleRequestSigner{
@ -265,8 +264,6 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer, meta) err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer, meta)
traversal.submitProcessed(nodeAddress)
if err != nil { if err != nil {
resultError.Store(err) resultError.Store(err)
svcutil.LogServiceError(s.log, "PUT", nodeAddress.Addresses(), err) svcutil.LogServiceError(s.log, "PUT", nodeAddress.Addresses(), err)
@ -279,6 +276,8 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
svcutil.LogWorkerPoolError(s.log, "PUT", err) svcutil.LogWorkerPoolError(s.log, "PUT", err)
return true return true
} }
traversal.submitProcessed(nodeAddress)
} }
wg.Wait() wg.Wait()