Fix get range hash #906
5 changed files with 297 additions and 17 deletions
|
@ -176,7 +176,7 @@ func initObjectService(c *cfg) {
|
|||
|
||||
*c.cfgObject.getSvc = *sGet // need smth better
|
||||
|
||||
sGetV2 := createGetServiceV2(sGet, keyStorage)
|
||||
sGetV2 := createGetServiceV2(c, sGet, keyStorage)
|
||||
|
||||
sDelete := createDeleteService(c, keyStorage, sGet, sSearch, sPut)
|
||||
|
||||
|
@ -370,10 +370,16 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
|||
getsvc.WithLogger(c.log))
|
||||
}
|
||||
|
||||
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
||||
func createGetServiceV2(c *cfg, sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
||||
return getsvcV2.NewService(
|
||||
getsvcV2.WithInternalService(sGet),
|
||||
getsvcV2.WithKeyStorage(keyStorage),
|
||||
getsvcV2.WithEpochSource(c.netMapSource),
|
||||
getsvcV2.WithClientSource(c.clientCache),
|
||||
getsvcV2.WithContainerSource(c.cfgObject.cnrSource),
|
||||
getsvcV2.WithNetmapSource(c.netMapSource),
|
||||
getsvcV2.WithNetmapAnnouncedKeys(c),
|
||||
getsvcV2.WithLogger(c.log),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -567,4 +567,8 @@ const (
|
|||
EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers"
|
||||
EngineInterruptProcessingZeroCountContainers = "interrupt processing zero-count containers"
|
||||
EngineFailedToGetContainerCounters = "failed to get container counters"
|
||||
GetSvcV2FailedToParseNodeEndpoints = "failed to parse node endpoints"
|
||||
GetSvcV2FailedToParseNodeExternalAddresses = "failed to parse node external addresses"
|
||||
GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node"
|
||||
GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes"
|
||||
)
|
||||
|
|
215
pkg/services/object/get/v2/get_range_hash.go
Normal file
215
pkg/services/object/get/v2/get_range_hash.go
Normal file
|
@ -0,0 +1,215 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
clientCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// GetRangeHash calls internal service and returns v2 response.
|
||||
func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
|
||||
forward, err := s.needToForwardGetRangeHashRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if forward.needToForward {
|
||||
return s.forwardGetRangeHashRequest(ctx, req, forward)
|
||||
}
|
||||
p, err := s.toHashRangePrm(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := s.svc.GetRangeHash(ctx, *p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return toHashResponse(req.GetBody().GetType(), res), nil
|
||||
}
|
||||
|
||||
type getRangeForwardParams struct {
|
||||
needToForward bool
|
||||
containerNodes []netmapSDK.NodeInfo
|
||||
address oid.Address
|
||||
}
|
||||
|
||||
func (s *Service) needToForwardGetRangeHashRequest(req *objectV2.GetRangeHashRequest) (getRangeForwardParams, error) {
|
||||
if req.GetMetaHeader().GetTTL() <= 1 {
|
||||
return getRangeForwardParams{}, nil
|
||||
}
|
||||
|
||||
var result getRangeForwardParams
|
||||
addrV2 := req.GetBody().GetAddress()
|
||||
if addrV2 == nil {
|
||||
return result, errMissingObjAddress
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
err := addr.ReadFromV2(*addrV2)
|
||||
if err != nil {
|
||||
return result, errInvalidObjAddress(err)
|
||||
}
|
||||
result.address = addr
|
||||
|
||||
cont, err := s.contSource.Get(addr.Container())
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("(%T) could not get container: %w", s, err)
|
||||
}
|
||||
|
||||
epoch, err := s.epochSource.Epoch()
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("(%T) could not get epoch: %w", s, err)
|
||||
}
|
||||
|
||||
nm, err := s.netmapSource.GetNetMapByEpoch(epoch)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("(%T) could not get netmap: %w", s, err)
|
||||
}
|
||||
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
objectID := addr.Object()
|
||||
nodesVector, err := builder.BuildPlacement(addr.Container(), &objectID, cont.Value.PlacementPolicy())
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("(%T) could not build object placement: %w", s, err)
|
||||
}
|
||||
result.containerNodes = distinctBy(placement.FlattenNodes(nodesVector), func(n netmapSDK.NodeInfo) string { return hex.EncodeToString(n.PublicKey()) })
|
||||
|
||||
for _, node := range result.containerNodes {
|
||||
if s.announcedKeys.IsLocalKey(node.PublicKey()) {
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
result.needToForward = true
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Service) forwardGetRangeHashRequest(ctx context.Context, req *objectV2.GetRangeHashRequest, params getRangeForwardParams) (*objectV2.GetRangeHashResponse, error) {
|
||||
key, err := s.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metaHdr := new(session.RequestMetaHeader)
|
||||
metaHdr.SetTTL(req.GetMetaHeader().GetTTL() - 1)
|
||||
metaHdr.SetOrigin(req.GetMetaHeader())
|
||||
writeCurrentVersion(metaHdr)
|
||||
req.SetMetaHeader(metaHdr)
|
||||
|
||||
if err := signature.SignServiceMessage(key, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var firstErr error
|
||||
for _, node := range params.containerNodes {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
var addrGr network.AddressGroup
|
||||
if err := addrGr.FromIterator(network.NodeEndpointsIterator(node)); err != nil {
|
||||
s.log.Warn(logs.GetSvcV2FailedToParseNodeEndpoints, zap.String("node_public_key", hex.EncodeToString(node.PublicKey())))
|
||||
continue
|
||||
}
|
||||
|
||||
var extAddr network.AddressGroup
|
||||
if len(node.ExternalAddresses()) > 0 {
|
||||
if err := extAddr.FromStringSlice(node.ExternalAddresses()); err != nil {
|
||||
s.log.Warn(logs.GetSvcV2FailedToParseNodeExternalAddresses, zap.String("node_public_key", hex.EncodeToString(node.PublicKey())))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
var info clientCore.NodeInfo
|
||||
clientCore.NodeInfoFromNetmapElement(&info, placement.NewNode(addrGr, extAddr, node.PublicKey()))
|
||||
|
||||
resp, err := s.performGetRangeHashOnNode(ctx, req, info)
|
||||
if err == nil {
|
||||
return resp, nil
|
||||
}
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
s.log.Debug(logs.GetSvcV2FailedToGetRangeHashFromNode,
|
||||
zap.String("node_public_key", hex.EncodeToString(node.PublicKey())),
|
||||
zap.Stringer("address", params.address),
|
||||
zap.Error(err))
|
||||
}
|
||||
s.log.Debug(logs.GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes, zap.Stringer("address", params.address), zap.Error(firstErr))
|
||||
if firstErr != nil {
|
||||
return nil, firstErr
|
||||
}
|
||||
return nil, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
|
||||
func (s *Service) performGetRangeHashOnNode(ctx context.Context, req *objectV2.GetRangeHashRequest, info clientCore.NodeInfo) (*objectV2.GetRangeHashResponse, error) {
|
||||
cl, err := s.clientSource.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var firstErr error
|
||||
var resp *objectV2.GetRangeHashResponse
|
||||
info.AddressGroup().IterateAddresses(func(a network.Address) bool {
|
||||
resp, err = s.performGetRangeHashOnAddress(ctx, req, cl, a)
|
||||
if err != nil {
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
if firstErr != nil {
|
||||
return nil, firstErr
|
||||
}
|
||||
if resp == nil {
|
||||
return nil, new(apistatus.ObjectNotFound)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Service) performGetRangeHashOnAddress(ctx context.Context, req *objectV2.GetRangeHashRequest, cl clientCore.MultiAddressClient,
|
||||
a network.Address,
|
||||
) (*objectV2.GetRangeHashResponse, error) {
|
||||
var resp *objectV2.GetRangeHashResponse
|
||||
var rpcErr error
|
||||
err := cl.RawForAddress(ctx, a, func(cli *rpcclient.Client) error {
|
||||
resp, rpcErr = rpc.HashObjectRange(cli, req, rpcclient.WithContext(ctx))
|
||||
return rpcErr
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func distinctBy[T any, K comparable](source []T, keySelector func(v T) K) []T {
|
||||
var result []T
|
||||
dict := make(map[K]struct{})
|
||||
for _, v := range source {
|
||||
key := keySelector(v)
|
||||
if _, exists := dict[key]; !exists {
|
||||
result = append(result, v)
|
||||
dict[key] = struct{}{}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
|
@ -5,10 +5,15 @@ import (
|
|||
"errors"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Service implements Get operation of Object service v2.
|
||||
|
@ -19,10 +24,30 @@ type Service struct {
|
|||
// Option represents Service constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type epochSource interface {
|
||||
Epoch() (uint64, error)
|
||||
}
|
||||
|
||||
type clientSource interface {
|
||||
Get(info clientcore.NodeInfo) (clientcore.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
svc *getsvc.Service
|
||||
|
||||
keyStorage *objutil.KeyStorage
|
||||
|
||||
epochSource epochSource
|
||||
|
||||
clientSource clientSource
|
||||
|
||||
netmapSource netmap.Source
|
||||
|
||||
announcedKeys netmap.AnnouncedKeys
|
||||
|
||||
contSource container.Source
|
||||
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
// NewService constructs Service instance from provided options.
|
||||
|
@ -76,21 +101,6 @@ func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetOb
|
|||
}
|
||||
}
|
||||
|
||||
// GetRangeHash calls internal service and returns v2 response.
|
||||
func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
|
||||
p, err := s.toHashRangePrm(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := s.svc.GetRangeHash(ctx, *p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return toHashResponse(req.GetBody().GetType(), res), nil
|
||||
}
|
||||
|
||||
// Head serves ForstFS API v2 compatible HEAD requests.
|
||||
func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||
resp := new(objectV2.HeadResponse)
|
||||
|
@ -125,3 +135,39 @@ func WithKeyStorage(ks *objutil.KeyStorage) Option {
|
|||
c.keyStorage = ks
|
||||
}
|
||||
}
|
||||
|
||||
func WithEpochSource(es epochSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.epochSource = es
|
||||
}
|
||||
}
|
||||
|
||||
func WithClientSource(cs clientSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.clientSource = cs
|
||||
}
|
||||
}
|
||||
|
||||
func WithNetmapSource(ns netmap.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.netmapSource = ns
|
||||
}
|
||||
}
|
||||
|
||||
func WithNetmapAnnouncedKeys(ak netmap.AnnouncedKeys) Option {
|
||||
return func(c *cfg) {
|
||||
c.announcedKeys = ak
|
||||
}
|
||||
}
|
||||
|
||||
func WithContainerSource(cs container.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.contSource = cs
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get V2 service"))}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -181,6 +181,15 @@ func (x Node) PublicKey() []byte {
|
|||
return x.key
|
||||
}
|
||||
|
||||
// NewNode creates new Node.
|
||||
func NewNode(addresses network.AddressGroup, externalAddresses network.AddressGroup, key []byte) Node {
|
||||
return Node{
|
||||
addresses: addresses,
|
||||
externalAddresses: externalAddresses,
|
||||
key: key,
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns next unprocessed address of the object placement.
|
||||
//
|
||||
// Returns nil if no nodes left or traversal operation succeeded.
|
||||
|
|
Loading…
Reference in a new issue