diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 45dd1a65..3bd696b4 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -1,7 +1,6 @@ package putsvc import ( - "errors" "fmt" "sync" "sync/atomic" @@ -23,15 +22,23 @@ type distributedTarget struct { chunks [][]byte - nodeTargetInitializer func(placement.Node) transformer.ObjectTarget + nodeTargetInitializer func(nodeDesc) transformer.ObjectTarget - relay func(placement.Node) error + isLocalKey func([]byte) bool + + relay func(nodeDesc) error fmt *object.FormatValidator log *logger.Logger } +type nodeDesc struct { + local bool + + info placement.Node +} + // errIncompletePut is returned if processing on a container fails. type errIncompletePut struct { singleErr error // error from the last responding node @@ -81,12 +88,9 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { return t.iteratePlacement(t.sendObject) } -func (t *distributedTarget) sendObject(node placement.Node) error { - if t.relay != nil { - err := t.relay(node) - if err == nil || !errors.Is(err, errLocalAddress) { - return err - } +func (t *distributedTarget) sendObject(node nodeDesc) error { + if !node.local && t.relay != nil { + return t.relay(node) } target := t.nodeTargetInitializer(node) @@ -99,7 +103,7 @@ func (t *distributedTarget) sendObject(node placement.Node) error { return nil } -func (t *distributedTarget) iteratePlacement(f func(placement.Node) error) (*transformer.AccessIdentifiers, error) { +func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) { traverser, err := placement.NewTraverser( append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., ) @@ -122,10 +126,13 @@ loop: wg.Add(1) addr := addrs[i] + + isLocal := t.isLocalKey(addr.Key()) + if err := t.workerPool.Submit(func() { defer wg.Done() - if err := f(addr); err != nil { + if err := f(nodeDesc{local: isLocal, info: addr}); err != nil { resErr.Store(err) svcutil.LogServiceError(t.log, "PUT", addr.Addresses(), err) return diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 6388ecb1..dcd9fe71 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -144,17 +144,11 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return nil } -var errLocalAddress = errors.New("can't relay to local address") - func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { - var relay func(placement.Node) error + var relay func(nodeDesc) error if p.relay != nil { - relay = func(node placement.Node) error { - addr := node.Addresses() - - if p.netmapKeys.IsLocalKey(node.Key()) { - return errLocalAddress - } + relay = func(node nodeDesc) error { + addr := node.info.Addresses() c, err := p.clientConstructor.Get(addr) if err != nil { @@ -168,8 +162,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { return &distributedTarget{ traverseOpts: prm.traverseOpts, workerPool: p.workerPool, - nodeTargetInitializer: func(node placement.Node) transformer.ObjectTarget { - if p.netmapKeys.IsLocalKey(node.Key()) { + nodeTargetInitializer: func(node nodeDesc) transformer.ObjectTarget { + if node.local { return &localTarget{ storage: p.localStore, } @@ -179,13 +173,15 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { ctx: p.ctx, keyStorage: p.keyStorage, commonPrm: prm.common, - addr: node.Addresses(), + addr: node.info.Addresses(), clientConstructor: p.clientConstructor, } }, relay: relay, fmt: p.fmtValidator, log: p.log, + + isLocalKey: p.netmapKeys.IsLocalKey, } }