[#607] object/get: Make client constructor to work with group address

Make Object Get service to work with `AddressGroup` instead of `Address` in
order to support multiple addresses of the storage node.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-06-22 15:08:17 +03:00 committed by Leonard Lyubich
parent 8972f84672
commit ad14df07f6
9 changed files with 59 additions and 33 deletions

View file

@ -303,7 +303,7 @@ func initObjectService(c *cfg) {
sGet := getsvc.New( sGet := getsvc.New(
getsvc.WithLogger(c.log), getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls), getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(coreConstructor), getsvc.WithClientConstructor(groupConstructor),
getsvc.WithTraverserGenerator( getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions( traverseGen.WithTraverseOptions(
placement.SuccessAfter(1), placement.SuccessAfter(1),

View file

@ -3,6 +3,7 @@ package getsvc
import ( import (
"context" "context"
"github.com/nspcc-dev/neofs-node/pkg/network"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -78,7 +79,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
// TODO: consider parallel execution // TODO: consider parallel execution
// TODO: consider optimization: if status == SPLIT we can continue until // TODO: consider optimization: if status == SPLIT we can continue until
// we reach the best result - split info with linking object ID. // we reach the best result - split info with linking object ID.
if exec.processNode(ctx, addrs[i]) { if exec.processNode(ctx, network.GroupFromAddress(addrs[i])) {
exec.log.Debug("completing the operation") exec.log.Debug("completing the operation")
return true return true
} }

View file

@ -270,9 +270,7 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
} }
} }
func (exec execCtx) remoteClient(node network.Address) (getClient, bool) { func (exec execCtx) remoteClient(node network.AddressGroup) (getClient, bool) {
log := exec.log.With(zap.Stringer("node", node))
c, err := exec.svc.clientCache.get(node) c, err := exec.svc.clientCache.get(node)
switch { switch {
@ -280,7 +278,7 @@ func (exec execCtx) remoteClient(node network.Address) (getClient, bool) {
exec.status = statusUndefined exec.status = statusUndefined
exec.err = err exec.err = err
log.Debug("could not construct remote node client") exec.log.Debug("could not construct remote node client")
case err == nil: case err == nil:
return c, true return c, true
} }

View file

@ -82,8 +82,8 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
return vs, nil return vs, nil
} }
func (c *testClientCache) get(mAddr network.Address) (getClient, error) { func (c *testClientCache) get(mAddr network.AddressGroup) (getClient, error) {
v, ok := c.clients[mAddr.HostAddr()] v, ok := c.clients[network.StringifyGroup(mAddr)]
if !ok { if !ok {
return nil, errors.New("could not construct client") return nil, errors.New("could not construct client")
} }
@ -100,7 +100,7 @@ func newTestClient() *testClient {
} }
} }
func (c *testClient) getObject(exec *execCtx, _ network.Address) (*objectSDK.Object, error) { func (c *testClient) getObject(exec *execCtx, _ network.AddressGroup) (*objectSDK.Object, error) {
v, ok := c.results[exec.address().String()] v, ok := c.results[exec.address().String()]
if !ok { if !ok {
return nil, object.ErrNotFound return nil, object.ErrNotFound
@ -406,16 +406,16 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) {
strconv.Itoa(60000+j), strconv.Itoa(60000+j),
) )
var na network.Address
err := na.FromString(a)
require.NoError(t, err)
as[j] = na.HostAddr()
ni := netmap.NewNodeInfo() ni := netmap.NewNodeInfo()
ni.SetAddress(a) ni.SetAddress(a)
var na network.AddressGroup
err := na.FromIterator(ni)
require.NoError(t, err)
as[j] = network.StringifyGroup(na)
ns[j] = *ni ns[j] = *ni
} }

View file

@ -34,7 +34,7 @@ type RangeHashPrm struct {
salt []byte salt []byte
} }
type RequestForwarder func(network.Address, coreclient.Client) (*objectSDK.Object, error) type RequestForwarder func(network.AddressGroup, coreclient.Client) (*objectSDK.Object, error)
// HeadPrm groups parameters of Head service call. // HeadPrm groups parameters of Head service call.
type HeadPrm struct { type HeadPrm struct {

View file

@ -10,10 +10,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (exec *execCtx) processNode(ctx context.Context, addr network.Address) bool { func (exec *execCtx) processNode(ctx context.Context, addr network.AddressGroup) bool {
log := exec.log.With(zap.Stringer("remote node", addr)) exec.log.Debug("processing node...")
log.Debug("processing node...")
client, ok := exec.remoteClient(addr) client, ok := exec.remoteClient(addr)
if !ok { if !ok {
@ -29,7 +27,7 @@ func (exec *execCtx) processNode(ctx context.Context, addr network.Address) bool
exec.status = statusUndefined exec.status = statusUndefined
exec.err = object.ErrNotFound exec.err = object.ErrNotFound
log.Debug("remote call failed", exec.log.Debug("remote call failed",
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
case err == nil: case err == nil:

View file

@ -22,7 +22,7 @@ type Service struct {
type Option func(*cfg) type Option func(*cfg)
type getClient interface { type getClient interface {
getObject(*execCtx, network.Address) (*objectSDK.Object, error) getObject(*execCtx, network.AddressGroup) (*objectSDK.Object, error)
} }
type cfg struct { type cfg struct {
@ -35,7 +35,7 @@ type cfg struct {
} }
clientCache interface { clientCache interface {
get(network.Address) (getClient, error) get(network.AddressGroup) (getClient, error)
} }
traverserGenerator interface { traverserGenerator interface {
@ -93,7 +93,7 @@ func WithLocalStorageEngine(e *engine.StorageEngine) Option {
} }
type ClientConstructor interface { type ClientConstructor interface {
Get(network.Address) (client.Client, error) Get(network.AddressGroup) (client.Client, error)
} }
// WithClientConstructor returns option to set constructor of remote node clients. // WithClientConstructor returns option to set constructor of remote node clients.

View file

@ -73,7 +73,7 @@ func (s *SimpleObjectWriter) Object() *object.Object {
return s.obj.Object() return s.obj.Object()
} }
func (c *clientCacheWrapper) get(addr network.Address) (getClient, error) { func (c *clientCacheWrapper) get(addr network.AddressGroup) (getClient, error) {
clt, err := c.cache.Get(addr) clt, err := c.cache.Get(addr)
return &clientWrapper{ return &clientWrapper{
@ -81,7 +81,7 @@ func (c *clientCacheWrapper) get(addr network.Address) (getClient, error) {
}, err }, err
} }
func (c *clientWrapper) getObject(exec *execCtx, addr network.Address) (*objectSDK.Object, error) { func (c *clientWrapper) getObject(exec *execCtx, addr network.AddressGroup) (*objectSDK.Object, error) {
if !exec.assembling && exec.prm.forwarder != nil { if !exec.assembling && exec.prm.forwarder != nil {
return exec.prm.forwarder(addr, c.client) return exec.prm.forwarder(addr, c.client)
} }

View file

@ -55,7 +55,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
if !commonPrm.LocalOnly() { if !commonPrm.LocalOnly() {
var onceResign sync.Once var onceResign sync.Once
p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) {
var err error var err error
// once compose and resign forwarding request // once compose and resign forwarding request
@ -144,7 +144,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
// convert the object // convert the object
return objectSDK.NewFromV2(obj), nil return objectSDK.NewFromV2(obj), nil
}) }))
} }
return p, nil return p, nil
@ -177,7 +177,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
if !commonPrm.LocalOnly() { if !commonPrm.LocalOnly() {
var onceResign sync.Once var onceResign sync.Once
p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) {
var err error var err error
// once compose and resign forwarding request // once compose and resign forwarding request
@ -242,7 +242,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
obj.SetPayload(payload) obj.SetPayload(payload)
return obj.Object(), nil return obj.Object(), nil
}) }))
} }
return p, nil return p, nil
@ -340,7 +340,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
if !commonPrm.LocalOnly() { if !commonPrm.LocalOnly() {
var onceResign sync.Once var onceResign sync.Once
p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) {
var err error var err error
// once compose and resign forwarding request // once compose and resign forwarding request
@ -439,7 +439,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp
// convert the object // convert the object
return raw.Object().SDK(), nil return raw.Object().SDK(), nil
}) }))
} }
return p, nil return p, nil
@ -507,3 +507,32 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
return sh return sh
} }
func groupAddressRequestForwarder(f func(network.Address, client.Client) (*objectSDK.Object, error)) getsvc.RequestForwarder {
return func(addrGroup network.AddressGroup, c client.Client) (*objectSDK.Object, error) {
var (
firstErr error
res *objectSDK.Object
)
addrGroup.IterateAddresses(func(addr network.Address) (stop bool) {
var err error
defer func() {
stop = err == nil
if stop || firstErr == nil {
firstErr = err
}
// would be nice to log otherwise
}()
res, err = f(addr, c)
return
})
return res, firstErr
}
}