[#607] object/put: Make client constructor to work with group address

Make Object Put service to work with `AddressGroup` instead of `Address` in
order to support multiple addresses of the storage node.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
remotes/fyrchik/meta-pebble
Leonard Lyubich 2021-06-22 14:12:57 +03:00 committed by Leonard Lyubich
parent 6e5d7f84af
commit 8972f84672
8 changed files with 89 additions and 49 deletions

View File

@ -158,6 +158,17 @@ func (x *coreClientConstructor) Get(addr network.Address) (coreclient.Client, er
return c.(coreclient.Client), nil return c.(coreclient.Client), nil
} }
type addressGroupClientConstructor coreClientConstructor
func (x *addressGroupClientConstructor) Get(addrGroup network.AddressGroup) (c coreclient.Client, err error) {
addrGroup.IterateAddresses(func(addr network.Address) bool {
c, err = (*coreClientConstructor)(x).Get(addr)
return true
})
return
}
func initObjectService(c *cfg) { func initObjectService(c *cfg) {
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore) keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore)
@ -183,6 +194,8 @@ func initObjectService(c *cfg) {
coreConstructor := (*coreClientConstructor)(clientConstructor) coreConstructor := (*coreClientConstructor)(clientConstructor)
groupConstructor := (*addressGroupClientConstructor)(coreConstructor)
irFetcher := &innerRingFetcher{ irFetcher := &innerRingFetcher{
sidechain: c.cfgMorph.client, sidechain: c.cfgMorph.client,
} }
@ -199,7 +212,7 @@ func initObjectService(c *cfg) {
), ),
replicator.WithLocalStorage(ls), replicator.WithLocalStorage(ls),
replicator.WithRemoteSender( replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, coreConstructor), putsvc.NewRemoteSender(keyStorage, groupConstructor),
), ),
) )
@ -251,7 +264,7 @@ func initObjectService(c *cfg) {
sPut := putsvc.NewService( sPut := putsvc.NewService(
putsvc.WithKeyStorage(keyStorage), putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(coreConstructor), putsvc.WithClientConstructor(groupConstructor),
putsvc.WithMaxSizeSource(c), putsvc.WithMaxSizeSource(c),
putsvc.WithLocalStorage(ls), putsvc.WithLocalStorage(ls),
putsvc.WithContainerSource(c.cfgObject.cnrStorage), putsvc.WithContainerSource(c.cfgObject.cnrStorage),

View File

@ -23,9 +23,9 @@ type distributedTarget struct {
chunks [][]byte chunks [][]byte
nodeTargetInitializer func(network.Address) transformer.ObjectTarget nodeTargetInitializer func(network.AddressGroup) transformer.ObjectTarget
relay func(network.Address) error relay func(network.AddressGroup) error
fmt *object.FormatValidator fmt *object.FormatValidator
@ -68,7 +68,7 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
return t.iteratePlacement(t.sendObject) return t.iteratePlacement(t.sendObject)
} }
func (t *distributedTarget) sendObject(addr network.Address) error { func (t *distributedTarget) sendObject(addr network.AddressGroup) error {
if t.relay != nil { if t.relay != nil {
err := t.relay(addr) err := t.relay(addr)
if err == nil || !errors.Is(err, errLocalAddress) { if err == nil || !errors.Is(err, errLocalAddress) {
@ -86,7 +86,7 @@ func (t *distributedTarget) sendObject(addr network.Address) error {
return nil return nil
} }
func (t *distributedTarget) iteratePlacement(f func(network.Address) error) (*transformer.AccessIdentifiers, error) { func (t *distributedTarget) iteratePlacement(f func(network.AddressGroup) error) (*transformer.AccessIdentifiers, error) {
traverser, err := placement.NewTraverser( traverser, err := placement.NewTraverser(
append(t.traverseOpts, placement.ForObject(t.obj.ID()))..., append(t.traverseOpts, placement.ForObject(t.obj.ID()))...,
) )
@ -110,7 +110,7 @@ loop:
if err := t.workerPool.Submit(func() { if err := t.workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
if err := f(addr); err != nil { if err := f(network.GroupFromAddress(addr)); err != nil {
svcutil.LogServiceError(t.log, "PUT", addr, err) svcutil.LogServiceError(t.log, "PUT", addr, err)
return return
} }

View File

@ -15,7 +15,7 @@ type PutInitPrm struct {
traverseOpts []placement.Option traverseOpts []placement.Option
relay func(network.Address, client.Client) error relay func(network.AddressGroup, client.Client) error
} }
type PutChunkPrm struct { type PutChunkPrm struct {
@ -46,7 +46,7 @@ func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm {
return p return p
} }
func (p *PutInitPrm) WithRelay(f func(network.Address, client.Client) error) *PutInitPrm { func (p *PutInitPrm) WithRelay(f func(network.AddressGroup, client.Client) error) *PutInitPrm {
if p != nil { if p != nil {
p.relay = f p.relay = f
} }

View File

@ -20,7 +20,7 @@ type remoteTarget struct {
commonPrm *util.CommonPrm commonPrm *util.CommonPrm
addr network.Address addr network.AddressGroup
obj *object.Object obj *object.Object
@ -37,7 +37,7 @@ type RemoteSender struct {
// RemotePutPrm groups remote put operation parameters. // RemotePutPrm groups remote put operation parameters.
type RemotePutPrm struct { type RemotePutPrm struct {
node network.Address node network.AddressGroup
obj *object.Object obj *object.Object
} }
@ -86,7 +86,7 @@ func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *Remot
} }
// WithNodeAddress sets network address of the remote node. // WithNodeAddress sets network address of the remote node.
func (p *RemotePutPrm) WithNodeAddress(v network.Address) *RemotePutPrm { func (p *RemotePutPrm) WithNodeAddress(v network.AddressGroup) *RemotePutPrm {
if p != nil { if p != nil {
p.node = v p.node = v
} }

View File

@ -30,7 +30,7 @@ type Service struct {
type Option func(*cfg) type Option func(*cfg)
type ClientConstructor interface { type ClientConstructor interface {
Get(network.Address) (client.Client, error) Get(network.AddressGroup) (client.Client, error)
} }
type cfg struct { type cfg struct {

View File

@ -20,7 +20,7 @@ type Streamer struct {
target transformer.ObjectTarget target transformer.ObjectTarget
relay func(network.Address, client.Client) error relay func(network.AddressGroup, client.Client) error
maxPayloadSz uint64 // network config maxPayloadSz uint64 // network config
} }
@ -147,10 +147,10 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
var errLocalAddress = errors.New("can't relay to local address") var errLocalAddress = errors.New("can't relay to local address")
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
var relay func(network.Address) error var relay func(network.AddressGroup) error
if p.relay != nil { if p.relay != nil {
relay = func(addr network.Address) error { relay = func(addr network.AddressGroup) error {
if network.IsLocalAddress(p.localAddrSrc, network.GroupFromAddress(addr)) { if network.IsLocalAddress(p.localAddrSrc, addr) {
return errLocalAddress return errLocalAddress
} }
@ -166,8 +166,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
return &distributedTarget{ return &distributedTarget{
traverseOpts: prm.traverseOpts, traverseOpts: prm.traverseOpts,
workerPool: p.workerPool, workerPool: p.workerPool,
nodeTargetInitializer: func(addr network.Address) transformer.ObjectTarget { nodeTargetInitializer: func(addr network.AddressGroup) transformer.ObjectTarget {
if network.IsLocalAddress(p.localAddrSrc, network.GroupFromAddress(addr)) { if network.IsLocalAddress(p.localAddrSrc, addr) {
return &localTarget{ return &localTarget{
storage: p.localStore, storage: p.localStore,
} }

View File

@ -125,36 +125,62 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
return fromPutResponse(resp), nil return fromPutResponse(resp), nil
} }
func (s *streamer) relayRequest(addr network.Address, c client.Client) error { func (s *streamer) relayRequest(addr network.AddressGroup, c client.Client) error {
// open stream // open stream
resp := new(object.PutResponse) resp := new(object.PutResponse)
stream, err := rpc.PutObject(c.RawForAddress(addr), resp) var firstErr error
if err != nil {
return fmt.Errorf("stream opening failed: %w", err)
}
// send init part addr.IterateAddresses(func(addr network.Address) (stop bool) {
err = stream.Write(s.init) var err error
if err != nil {
return fmt.Errorf("sending the initial message to stream failed: %w", err)
}
for i := range s.chunks { defer func() {
if err := stream.Write(s.chunks[i]); err != nil { stop = err == nil
return fmt.Errorf("sending the chunk %d failed: %w", i, err)
if stop || firstErr == nil {
firstErr = err
}
// would be nice to log otherwise
}()
var stream *rpc.PutRequestWriter
stream, err = rpc.PutObject(c.RawForAddress(addr), resp)
if err != nil {
err = fmt.Errorf("stream opening failed: %w", err)
return
} }
}
// close object stream and receive response from remote node // send init part
err = stream.Close() err = stream.Write(s.init)
if err != nil { if err != nil {
return fmt.Errorf("closing the stream failed: %w", err) err = fmt.Errorf("sending the initial message to stream failed: %w", err)
} return
}
// verify response structure for i := range s.chunks {
if err := signature.VerifyServiceMessage(resp); err != nil { if err = stream.Write(s.chunks[i]); err != nil {
return fmt.Errorf("response verification failed: %w", err) err = fmt.Errorf("sending the chunk %d failed: %w", i, err)
} return
return nil }
}
// close object stream and receive response from remote node
err = stream.Close()
if err != nil {
err = fmt.Errorf("closing the stream failed: %w", err)
return
}
// verify response structure
err = signature.VerifyServiceMessage(resp)
if err != nil {
err = fmt.Errorf("response verification failed: %w", err)
}
return
})
return firstErr
} }

View File

@ -2,6 +2,7 @@ package replicator
import ( import (
"context" "context"
"encoding/hex"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network"
@ -66,15 +67,15 @@ func (p *Replicator) handleTask(ctx context.Context, task *Task) {
default: default:
} }
netAddr := task.nodes[i].Address() log := p.log.With(zap.String("node", hex.EncodeToString(task.nodes[i].PublicKey())))
log := p.log.With(zap.String("node", netAddr)) var node network.AddressGroup
var node network.Address err := node.FromIterator(task.nodes[i])
err := node.FromString(netAddr)
if err != nil { if err != nil {
log.Error("could not parse network address") log.Error("could not parse network address",
zap.String("error", err.Error()),
)
continue continue
} }