[#501] object/put: refactor distributed target

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2021-05-28 10:34:31 +03:00 committed by Leonard Lyubich
parent 6ca7f4511c
commit a422f42ca9

View file

@ -45,13 +45,6 @@ func (t *distributedTarget) Write(p []byte) (n int, err error) {
}
func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
traverser, err := placement.NewTraverser(
append(t.traverseOpts, placement.ForObject(t.obj.ID()))...,
)
if err != nil {
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
}
sz := 0
for i := range t.chunks {
@ -70,6 +63,28 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
return nil, fmt.Errorf("(%T) could not validate payload content: %w", t, err)
}
return t.iteratePlacement(t.sendObject)
}
func (t *distributedTarget) sendObject(addr *network.Address) error {
target := t.nodeTargetInitializer(addr)
if err := target.WriteHeader(t.obj); err != nil {
return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil {
return fmt.Errorf("could not close object stream: %w", err)
}
return nil
}
func (t *distributedTarget) iteratePlacement(f func(*network.Address) error) (*transformer.AccessIdentifiers, error) {
traverser, err := placement.NewTraverser(
append(t.traverseOpts, placement.ForObject(t.obj.ID()))...,
)
if err != nil {
return nil, fmt.Errorf("(%T) could not create object placement traverser: %w", t, err)
}
loop:
for {
addrs := traverser.Next()
@ -83,21 +98,11 @@ loop:
wg.Add(1)
addr := addrs[i]
if err := t.workerPool.Submit(func() {
defer wg.Done()
target := t.nodeTargetInitializer(addr)
if err := target.WriteHeader(t.obj); err != nil {
svcutil.LogServiceError(t.log, "PUT", addr,
fmt.Errorf("could not write header: %w", err))
return
} else if _, err := target.Close(); err != nil {
svcutil.LogServiceError(t.log, "PUT", addr,
fmt.Errorf("could not close object stream: %w", err))
if err := f(addr); err != nil {
svcutil.LogServiceError(t.log, "PUT", addr, err)
return
}