forked from TrueCloudLab/frostfs-node
88e37ea372
The client needs of the IR application are very limited and rarely change. Interface changes of the client library should not affect the operation of various application packages, if they do not change their requirements for the provided functionality. To localize the use of the base client and facilitate further support, an auxiliary package is implemented that will only be used by the IR application. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
227 lines
6.2 KiB
Go
227 lines
6.2 KiB
Go
package innerring
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/storagegroup"
|
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
|
coreObject "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
neofsapiclient "github.com/nspcc-dev/neofs-node/pkg/innerring/internal/client"
|
|
auditproc "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
|
|
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type (
|
|
ClientCache struct {
|
|
log *zap.Logger
|
|
cache interface {
|
|
Get(clientcore.NodeInfo) (client.Client, error)
|
|
CloseAll()
|
|
}
|
|
key *ecdsa.PrivateKey
|
|
|
|
sgTimeout, headTimeout, rangeTimeout time.Duration
|
|
}
|
|
|
|
clientCacheParams struct {
|
|
Log *zap.Logger
|
|
Key *ecdsa.PrivateKey
|
|
|
|
SGTimeout, HeadTimeout, RangeTimeout time.Duration
|
|
}
|
|
)
|
|
|
|
func newClientCache(p *clientCacheParams) *ClientCache {
|
|
return &ClientCache{
|
|
log: p.Log,
|
|
cache: cache.NewSDKClientCache(),
|
|
key: p.Key,
|
|
sgTimeout: p.SGTimeout,
|
|
headTimeout: p.HeadTimeout,
|
|
rangeTimeout: p.RangeTimeout,
|
|
}
|
|
}
|
|
|
|
func (c *ClientCache) Get(info clientcore.NodeInfo) (client.Client, error) {
|
|
// Because cache is used by `ClientCache` exclusively,
|
|
// client will always have valid key.
|
|
return c.cache.Get(info)
|
|
}
|
|
|
|
// GetSG polls the container from audit task to get the object by id.
|
|
// Returns storage groups structure from received object.
|
|
func (c *ClientCache) GetSG(task *audit.Task, id *object.ID) (*storagegroup.StorageGroup, error) {
|
|
sgAddress := new(object.Address)
|
|
sgAddress.SetContainerID(task.ContainerID())
|
|
sgAddress.SetObjectID(id)
|
|
|
|
return c.getSG(task.AuditContext(), sgAddress, task.NetworkMap(), task.ContainerNodes())
|
|
}
|
|
|
|
func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netmap.Netmap, cn netmap.ContainerNodes) (*storagegroup.StorageGroup, error) {
|
|
nodes, err := placement.BuildObjectPlacement(nm, cn, addr.ObjectID())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't build object placement: %w", err)
|
|
}
|
|
|
|
getParams := new(client.GetObjectParams)
|
|
getParams.WithAddress(addr)
|
|
|
|
var info clientcore.NodeInfo
|
|
|
|
for _, node := range placement.FlattenNodes(nodes) {
|
|
err := clientcore.NodeInfoFromRawNetmapElement(&info, node)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
|
}
|
|
|
|
cli, err := c.getWrappedClient(info)
|
|
if err != nil {
|
|
c.log.Warn("can't setup remote connection",
|
|
zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
cctx, cancel := context.WithTimeout(ctx, c.sgTimeout)
|
|
|
|
// NOTE: we use the function which does not verify object integrity (checksums, signature),
|
|
// but it would be useful to do as part of a data audit.
|
|
payload, err := neofsapiclient.GetObjectPayload(cctx, cli, addr)
|
|
|
|
cancel()
|
|
|
|
if err != nil {
|
|
c.log.Warn("can't get payload of storage group object",
|
|
zap.String("error", err.Error()))
|
|
|
|
continue
|
|
}
|
|
|
|
sg := storagegroup.New()
|
|
|
|
err = sg.Unmarshal(payload)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't parse storage group payload: %w", err)
|
|
}
|
|
|
|
return sg, nil
|
|
}
|
|
|
|
return nil, coreObject.ErrNotFound
|
|
}
|
|
|
|
// GetHeader requests node from the container under audit to return object header by id.
|
|
func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.ID, relay bool) (*object.Object, error) {
|
|
objAddress := new(object.Address)
|
|
objAddress.SetContainerID(task.ContainerID())
|
|
objAddress.SetObjectID(id)
|
|
|
|
var info clientcore.NodeInfo
|
|
|
|
err := clientcore.NodeInfoFromRawNetmapElement(&info, node)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
|
}
|
|
|
|
cli, err := c.getWrappedClient(info)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err)
|
|
}
|
|
|
|
cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout)
|
|
|
|
var obj *object.Object
|
|
|
|
if relay {
|
|
// todo: function sets hardcoded TTL value, but instead we can set TTL based on container length
|
|
obj, err = neofsapiclient.GetObjectHeaderFromContainer(cctx, cli, objAddress)
|
|
} else {
|
|
obj, err = neofsapiclient.GetRawObjectHeaderLocally(cctx, cli, objAddress)
|
|
}
|
|
|
|
cancel()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("object head error: %w", err)
|
|
}
|
|
|
|
return obj, nil
|
|
}
|
|
|
|
// GetRangeHash requests node from the container under audit to return Tillich-Zemor hash of the
|
|
// payload range of the object with specified identifier.
|
|
func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *object.ID, rng *object.Range) ([]byte, error) {
|
|
objAddress := new(object.Address)
|
|
objAddress.SetContainerID(task.ContainerID())
|
|
objAddress.SetObjectID(id)
|
|
|
|
var info clientcore.NodeInfo
|
|
|
|
err := clientcore.NodeInfoFromRawNetmapElement(&info, node)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
|
}
|
|
|
|
cli, err := c.getWrappedClient(info)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err)
|
|
}
|
|
|
|
cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout)
|
|
|
|
h, err := neofsapiclient.HashObjectRange(cctx, cli, objAddress, rng)
|
|
|
|
cancel()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("object rangehash error: %w", err)
|
|
}
|
|
|
|
return h, nil
|
|
}
|
|
|
|
func (c *ClientCache) getWrappedClient(info clientcore.NodeInfo) (neofsapiclient.Client, error) {
|
|
// can be also cached
|
|
var cInternal neofsapiclient.Client
|
|
|
|
cli, err := c.Get(info)
|
|
if err != nil {
|
|
return cInternal, fmt.Errorf("could not get API client from cache")
|
|
}
|
|
|
|
cInternal.WrapBasicClient(cli)
|
|
cInternal.SetPrivateKey(c.key)
|
|
|
|
return cInternal, nil
|
|
}
|
|
|
|
func (c ClientCache) ListSG(dst *auditproc.SearchSGDst, prm auditproc.SearchSGPrm) error {
|
|
cli, err := c.getWrappedClient(prm.NodeInfo())
|
|
if err != nil {
|
|
return fmt.Errorf("could not get API client from cache")
|
|
}
|
|
|
|
var cliPrm neofsapiclient.SearchSGPrm
|
|
|
|
cliPrm.SetContext(prm.Context())
|
|
cliPrm.SetContainerID(prm.CID())
|
|
|
|
res, err := cli.SearchSG(cliPrm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dst.WriteIDList(res.IDList())
|
|
|
|
return nil
|
|
}
|