Refactor put service #199

Merged
fyrchik merged 4 commits from dstepanov-yadro/frostfs-node:refactoring/OBJECT_3610_putsvc into master 2023-04-03 15:58:12 +00:00
Showing only changes of commit 00377dca83 - Show all commits

View file

@ -164,7 +164,6 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
return nil
}
// nolint: funlen
func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) {
id, _ := t.obj.ID()
@ -175,72 +174,22 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
}
var resErr atomic.Value
resErr := &atomic.Value{}
loop:
for {
addrs := traverser.Next()
if len(addrs) == 0 {
break
}
wg := new(sync.WaitGroup)
for i := range addrs {
if t.traversal.processed(addrs[i]) {
// it can happen only during additional container broadcast
continue
}
wg.Add(1)
addr := addrs[i]
isLocal := t.isLocalKey(addr.PublicKey())
var workerPool util.WorkerPool
if isLocal {
workerPool = t.localPool
} else {
workerPool = t.remotePool
}
if err := workerPool.Submit(func() {
defer wg.Done()
err := f(nodeDesc{local: isLocal, info: addr})
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
if err != nil {
resErr.Store(err)
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
return
}
traverser.SubmitSuccess()
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(t.log, "PUT", err)
break loop
}
if t.iterateAddresses(traverser, addrs, f, resErr) {
break
}
wg.Wait()
}
if !traverser.Success() {
var err errIncompletePut
err.singleErr, _ = resErr.Load().(error)
return nil, err
}
@ -248,10 +197,7 @@ loop:
if t.traversal.submitPrimaryPlacementFinish() {
_, err = t.iteratePlacement(f)
if err != nil {
t.log.Error("additional container broadcast failure",
zap.Error(err),
)
t.log.Error("additional container broadcast failure", zap.Error(err))
// we don't fail primary operation because of broadcast failure
}
}
@ -261,3 +207,52 @@ loop:
return new(transformer.AccessIdentifiers).
WithSelfID(id), nil
}
func (t *distributedTarget) iterateAddresses(traverser *placement.Traverser, addrs []placement.Node, f func(nodeDesc) error, resErr *atomic.Value) bool {
wg := &sync.WaitGroup{}
for i := range addrs {
if t.traversal.processed(addrs[i]) {
// it can happen only during additional container broadcast
continue
}
wg.Add(1)
addr := addrs[i]
isLocal := t.isLocalKey(addr.PublicKey())
workerPool := t.remotePool
if isLocal {
workerPool = t.localPool
}
if err := workerPool.Submit(func() {
defer wg.Done()
err := f(nodeDesc{local: isLocal, info: addr})
// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
t.traversal.submitProcessed(addr)
if err != nil {
resErr.Store(err)
svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err)
return
}
traverser.SubmitSuccess()
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(t.log, "PUT", err)
return true
}
}
wg.Wait()
return false
}