frostfs-node/pkg/services/object/get/v2/get_range_hash.go

216 lines
6.3 KiB
Go
Raw Normal View History

package getsvc
import (
"context"
"encoding/hex"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
// GetRangeHash calls internal service and returns v2 response.
func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
forward, err := s.needToForwardGetRangeHashRequest(req)
if err != nil {
return nil, err
}
if forward.needToForward {
return s.forwardGetRangeHashRequest(ctx, req, forward)
}
p, err := s.toHashRangePrm(req)
if err != nil {
return nil, err
}
res, err := s.svc.GetRangeHash(ctx, *p)
if err != nil {
return nil, err
}
return toHashResponse(req.GetBody().GetType(), res), nil
}
type getRangeForwardParams struct {
needToForward bool
containerNodes []netmapSDK.NodeInfo
address oid.Address
}
func (s *Service) needToForwardGetRangeHashRequest(req *objectV2.GetRangeHashRequest) (getRangeForwardParams, error) {
if req.GetMetaHeader().GetTTL() <= 1 {
return getRangeForwardParams{}, nil
}
var result getRangeForwardParams
addrV2 := req.GetBody().GetAddress()
if addrV2 == nil {
return result, errMissingObjAddress
}
var addr oid.Address
err := addr.ReadFromV2(*addrV2)
if err != nil {
return result, errInvalidObjAddress(err)
}
result.address = addr
cont, err := s.contSource.Get(addr.Container())
if err != nil {
return result, fmt.Errorf("(%T) could not get container: %w", s, err)
}
epoch, err := s.netmapSource.Epoch()
if err != nil {
return result, fmt.Errorf("(%T) could not get epoch: %w", s, err)
}
nm, err := s.netmapSource.GetNetMapByEpoch(epoch)
if err != nil {
return result, fmt.Errorf("(%T) could not get netmap: %w", s, err)
}
builder := placement.NewNetworkMapBuilder(nm)
objectID := addr.Object()
nodesVector, err := builder.BuildPlacement(addr.Container(), &objectID, cont.Value.PlacementPolicy())
if err != nil {
return result, fmt.Errorf("(%T) could not build object placement: %w", s, err)
}
result.containerNodes = distinctBy(placement.FlattenNodes(nodesVector), func(n netmapSDK.NodeInfo) string { return hex.EncodeToString(n.PublicKey()) })
for _, node := range result.containerNodes {
if s.announcedKeys.IsLocalKey(node.PublicKey()) {
return result, nil
}
}
result.needToForward = true
return result, nil
}
func (s *Service) forwardGetRangeHashRequest(ctx context.Context, req *objectV2.GetRangeHashRequest, params getRangeForwardParams) (*objectV2.GetRangeHashResponse, error) {
key, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
metaHdr := new(session.RequestMetaHeader)
metaHdr.SetTTL(req.GetMetaHeader().GetTTL() - 1)
metaHdr.SetOrigin(req.GetMetaHeader())
writeCurrentVersion(metaHdr)
req.SetMetaHeader(metaHdr)
if err := signature.SignServiceMessage(key, req); err != nil {
return nil, err
}
var firstErr error
for _, node := range params.containerNodes {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
var addrGr network.AddressGroup
if err := addrGr.FromIterator(network.NodeEndpointsIterator(node)); err != nil {
s.log.Warn(logs.GetSvcV2FailedToParseNodeEndpoints, zap.String("node_public_key", hex.EncodeToString(node.PublicKey())))
continue
}
var extAddr network.AddressGroup
if len(node.ExternalAddresses()) > 0 {
if err := extAddr.FromStringSlice(node.ExternalAddresses()); err != nil {
s.log.Warn(logs.GetSvcV2FailedToParseNodeExternalAddresses, zap.String("node_public_key", hex.EncodeToString(node.PublicKey())))
continue
}
}
var info clientCore.NodeInfo
clientCore.NodeInfoFromNetmapElement(&info, placement.NewNode(addrGr, extAddr, node.PublicKey()))
resp, err := s.performGetRangeHashOnNode(ctx, req, info)
if err == nil {
return resp, nil
}
if firstErr == nil {
firstErr = err
}
s.log.Debug(logs.GetSvcV2FailedToGetRangeHashFromNode,
zap.String("node_public_key", hex.EncodeToString(node.PublicKey())),
zap.Stringer("address", params.address),
zap.Error(err))
}
s.log.Debug(logs.GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes, zap.Stringer("address", params.address), zap.Error(firstErr))
if firstErr != nil {
return nil, firstErr
}
return nil, new(apistatus.ObjectNotFound)
}
func (s *Service) performGetRangeHashOnNode(ctx context.Context, req *objectV2.GetRangeHashRequest, info clientCore.NodeInfo) (*objectV2.GetRangeHashResponse, error) {
cl, err := s.clientSource.Get(info)
if err != nil {
return nil, err
}
var firstErr error
var resp *objectV2.GetRangeHashResponse
info.AddressGroup().IterateAddresses(func(a network.Address) bool {
resp, err = s.performGetRangeHashOnAddress(ctx, req, cl, a)
if err != nil {
if firstErr == nil {
firstErr = err
}
return false
}
return true
})
if firstErr != nil {
return nil, firstErr
}
if resp == nil {
return nil, new(apistatus.ObjectNotFound)
}
return resp, nil
}
func (s *Service) performGetRangeHashOnAddress(ctx context.Context, req *objectV2.GetRangeHashRequest, cl clientCore.MultiAddressClient,
a network.Address,
) (*objectV2.GetRangeHashResponse, error) {
var resp *objectV2.GetRangeHashResponse
var rpcErr error
err := cl.RawForAddress(ctx, a, func(cli *rpcclient.Client) error {
resp, rpcErr = rpc.HashObjectRange(cli, req, rpcclient.WithContext(ctx))
return rpcErr
})
if err != nil {
return nil, err
}
return resp, err
}
func distinctBy[T any, K comparable](source []T, keySelector func(v T) K) []T {
var result []T
dict := make(map[K]struct{})
for _, v := range source {
key := keySelector(v)
if _, exists := dict[key]; !exists {
result = append(result, v)
dict[key] = struct{}{}
}
}
return result
}