forked from TrueCloudLab/frostfs-node
[#607] object/head: Make client constructor to work with group address
Make Object Head service to work with `AddressGroup` instead of `Address` in order to support multiple addresses of the storage node. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
d0e48c949b
commit
8ac3c62518
3 changed files with 30 additions and 26 deletions
|
@ -149,20 +149,26 @@ func (n *innerRingFetcher) InnerRingKeys() ([][]byte, error) {
|
||||||
|
|
||||||
type coreClientConstructor reputationClientConstructor
|
type coreClientConstructor reputationClientConstructor
|
||||||
|
|
||||||
func (x *coreClientConstructor) Get(addr network.Address) (coreclient.Client, error) {
|
func (x *coreClientConstructor) Get(addrGroup network.AddressGroup) (cc coreclient.Client, err error) {
|
||||||
c, err := (*reputationClientConstructor)(x).Get(addr)
|
var c client.Client
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.(coreclient.Client), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type addressGroupClientConstructor coreClientConstructor
|
|
||||||
|
|
||||||
func (x *addressGroupClientConstructor) Get(addrGroup network.AddressGroup) (c coreclient.Client, err error) {
|
|
||||||
addrGroup.IterateAddresses(func(addr network.Address) bool {
|
addrGroup.IterateAddresses(func(addr network.Address) bool {
|
||||||
c, err = (*coreClientConstructor)(x).Get(addr)
|
c, err = (*reputationClientConstructor)(x).Get(addr)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
cc = c.(coreclient.Client)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type addressGroupClientConstructor reputationClientConstructor
|
||||||
|
|
||||||
|
func (x *addressGroupClientConstructor) Get(addrGroup network.AddressGroup) (c client.Client, err error) {
|
||||||
|
addrGroup.IterateAddresses(func(addr network.Address) bool {
|
||||||
|
c, err = (*reputationClientConstructor)(x).Get(addr)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -212,7 +218,7 @@ func initObjectService(c *cfg) {
|
||||||
),
|
),
|
||||||
replicator.WithLocalStorage(ls),
|
replicator.WithLocalStorage(ls),
|
||||||
replicator.WithRemoteSender(
|
replicator.WithRemoteSender(
|
||||||
putsvc.NewRemoteSender(keyStorage, groupConstructor),
|
putsvc.NewRemoteSender(keyStorage, coreConstructor),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -231,7 +237,7 @@ func initObjectService(c *cfg) {
|
||||||
policer.WithExpansionRate(10),
|
policer.WithExpansionRate(10),
|
||||||
policer.WithTrigger(ch),
|
policer.WithTrigger(ch),
|
||||||
policer.WithRemoteHeader(
|
policer.WithRemoteHeader(
|
||||||
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
|
headsvc.NewRemoteHeader(keyStorage, groupConstructor),
|
||||||
),
|
),
|
||||||
policer.WithLocalAddressSource(c),
|
policer.WithLocalAddressSource(c),
|
||||||
policer.WithHeadTimeout(
|
policer.WithHeadTimeout(
|
||||||
|
@ -264,7 +270,7 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sPut := putsvc.NewService(
|
sPut := putsvc.NewService(
|
||||||
putsvc.WithKeyStorage(keyStorage),
|
putsvc.WithKeyStorage(keyStorage),
|
||||||
putsvc.WithClientConstructor(groupConstructor),
|
putsvc.WithClientConstructor(coreConstructor),
|
||||||
putsvc.WithMaxSizeSource(c),
|
putsvc.WithMaxSizeSource(c),
|
||||||
putsvc.WithLocalStorage(ls),
|
putsvc.WithLocalStorage(ls),
|
||||||
putsvc.WithContainerSource(c.cfgObject.cnrStorage),
|
putsvc.WithContainerSource(c.cfgObject.cnrStorage),
|
||||||
|
@ -286,7 +292,7 @@ func initObjectService(c *cfg) {
|
||||||
sSearch := searchsvc.New(
|
sSearch := searchsvc.New(
|
||||||
searchsvc.WithLogger(c.log),
|
searchsvc.WithLogger(c.log),
|
||||||
searchsvc.WithLocalStorageEngine(ls),
|
searchsvc.WithLocalStorageEngine(ls),
|
||||||
searchsvc.WithClientConstructor(groupConstructor),
|
searchsvc.WithClientConstructor(coreConstructor),
|
||||||
searchsvc.WithTraverserGenerator(
|
searchsvc.WithTraverserGenerator(
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
placement.WithoutSuccessTracking(),
|
placement.WithoutSuccessTracking(),
|
||||||
|
@ -303,7 +309,7 @@ func initObjectService(c *cfg) {
|
||||||
sGet := getsvc.New(
|
sGet := getsvc.New(
|
||||||
getsvc.WithLogger(c.log),
|
getsvc.WithLogger(c.log),
|
||||||
getsvc.WithLocalStorageEngine(ls),
|
getsvc.WithLocalStorageEngine(ls),
|
||||||
getsvc.WithClientConstructor(groupConstructor),
|
getsvc.WithClientConstructor(coreConstructor),
|
||||||
getsvc.WithTraverserGenerator(
|
getsvc.WithTraverserGenerator(
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
placement.SuccessAfter(1),
|
placement.SuccessAfter(1),
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(network.Address) (client.Client, error)
|
Get(network.AddressGroup) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteHeader represents utility for getting
|
// RemoteHeader represents utility for getting
|
||||||
|
@ -28,7 +28,7 @@ type RemoteHeader struct {
|
||||||
type RemoteHeadPrm struct {
|
type RemoteHeadPrm struct {
|
||||||
commonHeadPrm *Prm
|
commonHeadPrm *Prm
|
||||||
|
|
||||||
node network.Address
|
node network.AddressGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrNotFound = errors.New("object header not found")
|
var ErrNotFound = errors.New("object header not found")
|
||||||
|
@ -41,8 +41,8 @@ func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *Remo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeAddress sets network address of the remote node.
|
// WithNodeAddress sets network address group of the remote node.
|
||||||
func (p *RemoteHeadPrm) WithNodeAddress(v network.Address) *RemoteHeadPrm {
|
func (p *RemoteHeadPrm) WithNodeAddress(v network.AddressGroup) *RemoteHeadPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.node = v
|
p.node = v
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,20 +57,18 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
netAddr := nodes[i].Address()
|
var node network.AddressGroup
|
||||||
|
|
||||||
log := p.log.With(zap.String("node", netAddr))
|
err := node.FromIterator(nodes[i])
|
||||||
|
|
||||||
var node network.Address
|
|
||||||
|
|
||||||
err := node.FromString(netAddr)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not parse network address")
|
p.log.Error("could not parse network address",
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if network.IsLocalAddress(p.localAddrSrc, network.GroupFromAddress(node)) {
|
if network.IsLocalAddress(p.localAddrSrc, node) {
|
||||||
if shortage == 0 {
|
if shortage == 0 {
|
||||||
// we can call the redundant copy callback
|
// we can call the redundant copy callback
|
||||||
// here to slightly improve the performance
|
// here to slightly improve the performance
|
||||||
|
@ -93,7 +91,7 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes
|
||||||
if strings.Contains(err.Error(), headsvc.ErrNotFound.Error()) {
|
if strings.Contains(err.Error(), headsvc.ErrNotFound.Error()) {
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
log.Error("could not receive object header",
|
p.log.Error("could not receive object header",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue