diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 8a01e6d5..6c3da104 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -158,6 +158,17 @@ func (x *coreClientConstructor) Get(addr network.Address) (coreclient.Client, er return c.(coreclient.Client), nil } +type addressGroupClientConstructor coreClientConstructor + +func (x *addressGroupClientConstructor) Get(addrGroup network.AddressGroup) (c coreclient.Client, err error) { + addrGroup.IterateAddresses(func(addr network.Address) bool { + c, err = (*coreClientConstructor)(x).Get(addr) + return true + }) + + return +} + func initObjectService(c *cfg) { ls := c.cfgObject.cfgLocalStorage.localStorage keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore) @@ -183,6 +194,8 @@ func initObjectService(c *cfg) { coreConstructor := (*coreClientConstructor)(clientConstructor) + groupConstructor := (*addressGroupClientConstructor)(coreConstructor) + irFetcher := &innerRingFetcher{ sidechain: c.cfgMorph.client, } @@ -199,7 +212,7 @@ func initObjectService(c *cfg) { ), replicator.WithLocalStorage(ls), replicator.WithRemoteSender( - putsvc.NewRemoteSender(keyStorage, coreConstructor), + putsvc.NewRemoteSender(keyStorage, groupConstructor), ), ) @@ -251,7 +264,7 @@ func initObjectService(c *cfg) { sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), - putsvc.WithClientConstructor(coreConstructor), + putsvc.WithClientConstructor(groupConstructor), putsvc.WithMaxSizeSource(c), putsvc.WithLocalStorage(ls), putsvc.WithContainerSource(c.cfgObject.cnrStorage), diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index eed9c9bd..903c175e 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -23,9 +23,9 @@ type distributedTarget struct { chunks [][]byte - nodeTargetInitializer func(network.Address) transformer.ObjectTarget + nodeTargetInitializer func(network.AddressGroup) transformer.ObjectTarget - relay func(network.Address) error + relay func(network.AddressGroup) error fmt *object.FormatValidator @@ -68,7 +68,7 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { return t.iteratePlacement(t.sendObject) } -func (t *distributedTarget) sendObject(addr network.Address) error { +func (t *distributedTarget) sendObject(addr network.AddressGroup) error { if t.relay != nil { err := t.relay(addr) if err == nil || !errors.Is(err, errLocalAddress) { @@ -86,7 +86,7 @@ func (t *distributedTarget) sendObject(addr network.Address) error { return nil } -func (t *distributedTarget) iteratePlacement(f func(network.Address) error) (*transformer.AccessIdentifiers, error) { +func (t *distributedTarget) iteratePlacement(f func(network.AddressGroup) error) (*transformer.AccessIdentifiers, error) { traverser, err := placement.NewTraverser( append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., ) @@ -110,7 +110,7 @@ loop: if err := t.workerPool.Submit(func() { defer wg.Done() - if err := f(addr); err != nil { + if err := f(network.GroupFromAddress(addr)); err != nil { svcutil.LogServiceError(t.log, "PUT", addr, err) return } diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 00a4458c..61e042db 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -15,7 +15,7 @@ type PutInitPrm struct { traverseOpts []placement.Option - relay func(network.Address, client.Client) error + relay func(network.AddressGroup, client.Client) error } type PutChunkPrm struct { @@ -46,7 +46,7 @@ func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm { return p } -func (p *PutInitPrm) WithRelay(f func(network.Address, client.Client) error) *PutInitPrm { +func (p *PutInitPrm) WithRelay(f func(network.AddressGroup, client.Client) error) *PutInitPrm { if p != nil { p.relay = f } diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 33e790b1..2339a39e 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -20,7 +20,7 @@ type remoteTarget struct { commonPrm *util.CommonPrm - addr network.Address + addr network.AddressGroup obj *object.Object @@ -37,7 +37,7 @@ type RemoteSender struct { // RemotePutPrm groups remote put operation parameters. type RemotePutPrm struct { - node network.Address + node network.AddressGroup obj *object.Object } @@ -86,7 +86,7 @@ func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *Remot } // WithNodeAddress sets network address of the remote node. -func (p *RemotePutPrm) WithNodeAddress(v network.Address) *RemotePutPrm { +func (p *RemotePutPrm) WithNodeAddress(v network.AddressGroup) *RemotePutPrm { if p != nil { p.node = v } diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 26611686..14817d3a 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -30,7 +30,7 @@ type Service struct { type Option func(*cfg) type ClientConstructor interface { - Get(network.Address) (client.Client, error) + Get(network.AddressGroup) (client.Client, error) } type cfg struct { diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 675f9aa4..5474e071 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -20,7 +20,7 @@ type Streamer struct { target transformer.ObjectTarget - relay func(network.Address, client.Client) error + relay func(network.AddressGroup, client.Client) error maxPayloadSz uint64 // network config } @@ -147,10 +147,10 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { var errLocalAddress = errors.New("can't relay to local address") func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { - var relay func(network.Address) error + var relay func(network.AddressGroup) error if p.relay != nil { - relay = func(addr network.Address) error { - if network.IsLocalAddress(p.localAddrSrc, network.GroupFromAddress(addr)) { + relay = func(addr network.AddressGroup) error { + if network.IsLocalAddress(p.localAddrSrc, addr) { return errLocalAddress } @@ -166,8 +166,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { return &distributedTarget{ traverseOpts: prm.traverseOpts, workerPool: p.workerPool, - nodeTargetInitializer: func(addr network.Address) transformer.ObjectTarget { - if network.IsLocalAddress(p.localAddrSrc, network.GroupFromAddress(addr)) { + nodeTargetInitializer: func(addr network.AddressGroup) transformer.ObjectTarget { + if network.IsLocalAddress(p.localAddrSrc, addr) { return &localTarget{ storage: p.localStore, } diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index e1dd3934..4386e4e3 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -125,36 +125,62 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { return fromPutResponse(resp), nil } -func (s *streamer) relayRequest(addr network.Address, c client.Client) error { +func (s *streamer) relayRequest(addr network.AddressGroup, c client.Client) error { // open stream resp := new(object.PutResponse) - stream, err := rpc.PutObject(c.RawForAddress(addr), resp) - if err != nil { - return fmt.Errorf("stream opening failed: %w", err) - } + var firstErr error - // send init part - err = stream.Write(s.init) - if err != nil { - return fmt.Errorf("sending the initial message to stream failed: %w", err) - } + addr.IterateAddresses(func(addr network.Address) (stop bool) { + var err error - for i := range s.chunks { - if err := stream.Write(s.chunks[i]); err != nil { - return fmt.Errorf("sending the chunk %d failed: %w", i, err) + defer func() { + stop = err == nil + + if stop || firstErr == nil { + firstErr = err + } + + // would be nice to log otherwise + }() + + var stream *rpc.PutRequestWriter + + stream, err = rpc.PutObject(c.RawForAddress(addr), resp) + if err != nil { + err = fmt.Errorf("stream opening failed: %w", err) + return } - } - // close object stream and receive response from remote node - err = stream.Close() - if err != nil { - return fmt.Errorf("closing the stream failed: %w", err) - } + // send init part + err = stream.Write(s.init) + if err != nil { + err = fmt.Errorf("sending the initial message to stream failed: %w", err) + return + } - // verify response structure - if err := signature.VerifyServiceMessage(resp); err != nil { - return fmt.Errorf("response verification failed: %w", err) - } - return nil + for i := range s.chunks { + if err = stream.Write(s.chunks[i]); err != nil { + err = fmt.Errorf("sending the chunk %d failed: %w", i, err) + return + } + } + + // close object stream and receive response from remote node + err = stream.Close() + if err != nil { + err = fmt.Errorf("closing the stream failed: %w", err) + return + } + + // verify response structure + err = signature.VerifyServiceMessage(resp) + if err != nil { + err = fmt.Errorf("response verification failed: %w", err) + } + + return + }) + + return firstErr } diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 693f50cc..3786698f 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -2,6 +2,7 @@ package replicator import ( "context" + "encoding/hex" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network" @@ -66,15 +67,15 @@ func (p *Replicator) handleTask(ctx context.Context, task *Task) { default: } - netAddr := task.nodes[i].Address() + log := p.log.With(zap.String("node", hex.EncodeToString(task.nodes[i].PublicKey()))) - log := p.log.With(zap.String("node", netAddr)) + var node network.AddressGroup - var node network.Address - - err := node.FromString(netAddr) + err := node.FromIterator(task.nodes[i]) if err != nil { - log.Error("could not parse network address") + log.Error("could not parse network address", + zap.String("error", err.Error()), + ) continue }