forked from TrueCloudLab/frostfs-node
[#199] putsvc: Refactor placement iterator
Resolve funlen linter for iteratePlacement method Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
a69c6d1ec9
commit
14d894178e
1 changed files with 53 additions and 58 deletions
|
@ -164,7 +164,6 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint: funlen
|
|
||||||
func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) {
|
func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) {
|
||||||
id, _ := t.obj.ID()
|
id, _ := t.obj.ID()
|
||||||
|
|
||||||
|
@ -175,72 +174,22 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform
|
||||||
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
|
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var resErr atomic.Value
|
resErr := &atomic.Value{}
|
||||||
|
|
||||||
loop:
|
|
||||||
for {
|
for {
|
||||||
addrs := traverser.Next()
|
addrs := traverser.Next()
|
||||||
if len(addrs) == 0 {
|
if len(addrs) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
if t.iterateAddresses(traverser, addrs, f, resErr) {
|
||||||
|
break
|
||||||
for i := range addrs {
|
|
||||||
if t.traversal.processed(addrs[i]) {
|
|
||||||
// it can happen only during additional container broadcast
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
addr := addrs[i]
|
|
||||||
|
|
||||||
isLocal := t.isLocalKey(addr.PublicKey())
|
|
||||||
|
|
||||||
var workerPool util.WorkerPool
|
|
||||||
|
|
||||||
if isLocal {
|
|
||||||
workerPool = t.localPool
|
|
||||||
} else {
|
|
||||||
workerPool = t.remotePool
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := workerPool.Submit(func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
err := f(nodeDesc{local: isLocal, info: addr})
|
|
||||||
|
|
||||||
// mark the container node as processed in order to exclude it
|
|
||||||
// in subsequent container broadcast. Note that we don't
|
|
||||||
// process this node during broadcast if primary placement
|
|
||||||
// on it failed.
|
|
||||||
t.traversal.submitProcessed(addr)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
resErr.Store(err)
|
|
||||||
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
traverser.SubmitSuccess()
|
|
||||||
}); err != nil {
|
|
||||||
wg.Done()
|
|
||||||
|
|
||||||
svcutil.LogWorkerPoolError(t.log, "PUT", err)
|
|
||||||
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !traverser.Success() {
|
if !traverser.Success() {
|
||||||
var err errIncompletePut
|
var err errIncompletePut
|
||||||
|
|
||||||
err.singleErr, _ = resErr.Load().(error)
|
err.singleErr, _ = resErr.Load().(error)
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,10 +197,7 @@ loop:
|
||||||
if t.traversal.submitPrimaryPlacementFinish() {
|
if t.traversal.submitPrimaryPlacementFinish() {
|
||||||
_, err = t.iteratePlacement(f)
|
_, err = t.iteratePlacement(f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Error("additional container broadcast failure",
|
t.log.Error("additional container broadcast failure", zap.Error(err))
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
|
|
||||||
// we don't fail primary operation because of broadcast failure
|
// we don't fail primary operation because of broadcast failure
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -261,3 +207,52 @@ loop:
|
||||||
return new(transformer.AccessIdentifiers).
|
return new(transformer.AccessIdentifiers).
|
||||||
WithSelfID(id), nil
|
WithSelfID(id), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *distributedTarget) iterateAddresses(traverser *placement.Traverser, addrs []placement.Node, f func(nodeDesc) error, resErr *atomic.Value) bool {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
|
for i := range addrs {
|
||||||
|
if t.traversal.processed(addrs[i]) {
|
||||||
|
// it can happen only during additional container broadcast
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
addr := addrs[i]
|
||||||
|
isLocal := t.isLocalKey(addr.PublicKey())
|
||||||
|
|
||||||
|
workerPool := t.remotePool
|
||||||
|
if isLocal {
|
||||||
|
workerPool = t.localPool
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := workerPool.Submit(func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
err := f(nodeDesc{local: isLocal, info: addr})
|
||||||
|
|
||||||
|
// mark the container node as processed in order to exclude it
|
||||||
|
// in subsequent container broadcast. Note that we don't
|
||||||
|
// process this node during broadcast if primary placement
|
||||||
|
// on it failed.
|
||||||
|
t.traversal.submitProcessed(addr)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
resErr.Store(err)
|
||||||
|
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
traverser.SubmitSuccess()
|
||||||
|
}); err != nil {
|
||||||
|
wg.Done()
|
||||||
|
svcutil.LogWorkerPoolError(t.log, "PUT", err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue