diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index e84696e6b..a47ea2654 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -176,7 +176,7 @@ func initObjectService(c *cfg) { *c.cfgObject.getSvc = *sGet // need smth better - sGetV2 := createGetServiceV2(sGet, keyStorage) + sGetV2 := createGetServiceV2(c, sGet, keyStorage) sDelete := createDeleteService(c, keyStorage, sGet, sSearch, sPut) @@ -370,10 +370,16 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra getsvc.WithLogger(c.log)) } -func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service { +func createGetServiceV2(c *cfg, sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service { return getsvcV2.NewService( getsvcV2.WithInternalService(sGet), getsvcV2.WithKeyStorage(keyStorage), + getsvcV2.WithEpochSource(c.netMapSource), + getsvcV2.WithClientSource(c.clientCache), + getsvcV2.WithContainerSource(c.cfgObject.cnrSource), + getsvcV2.WithNetmapSource(c.netMapSource), + getsvcV2.WithNetmapAnnouncedKeys(c), + getsvcV2.WithLogger(c.log), ) } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 420c6612a..606b5cb9a 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -567,4 +567,8 @@ const ( EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers" EngineInterruptProcessingZeroCountContainers = "interrupt processing zero-count containers" EngineFailedToGetContainerCounters = "failed to get container counters" + GetSvcV2FailedToParseNodeEndpoints = "failed to parse node endpoints" + GetSvcV2FailedToParseNodeExternalAddresses = "failed to parse node external addresses" + GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node" + GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes" ) diff --git a/pkg/services/object/get/v2/get_range_hash.go b/pkg/services/object/get/v2/get_range_hash.go new file mode 100644 index 000000000..d2ed16ebf --- /dev/null +++ b/pkg/services/object/get/v2/get_range_hash.go @@ -0,0 +1,215 @@ +package getsvc + +import ( + "context" + "encoding/hex" + "fmt" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + rpcclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + clientCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +// GetRangeHash calls internal service and returns v2 response. +func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { + forward, err := s.needToForwardGetRangeHashRequest(req) + if err != nil { + return nil, err + } + if forward.needToForward { + return s.forwardGetRangeHashRequest(ctx, req, forward) + } + p, err := s.toHashRangePrm(req) + if err != nil { + return nil, err + } + + res, err := s.svc.GetRangeHash(ctx, *p) + if err != nil { + return nil, err + } + + return toHashResponse(req.GetBody().GetType(), res), nil +} + +type getRangeForwardParams struct { + needToForward bool + containerNodes []netmapSDK.NodeInfo + address oid.Address +} + +func (s *Service) needToForwardGetRangeHashRequest(req *objectV2.GetRangeHashRequest) (getRangeForwardParams, error) { + if req.GetMetaHeader().GetTTL() <= 1 { + return getRangeForwardParams{}, nil + } + + var result getRangeForwardParams + addrV2 := req.GetBody().GetAddress() + if addrV2 == nil { + return result, errMissingObjAddress + } + + var addr oid.Address + err := addr.ReadFromV2(*addrV2) + if err != nil { + return result, errInvalidObjAddress(err) + } + result.address = addr + + cont, err := s.contSource.Get(addr.Container()) + if err != nil { + return result, fmt.Errorf("(%T) could not get container: %w", s, err) + } + + epoch, err := s.epochSource.Epoch() + if err != nil { + return result, fmt.Errorf("(%T) could not get epoch: %w", s, err) + } + + nm, err := s.netmapSource.GetNetMapByEpoch(epoch) + if err != nil { + return result, fmt.Errorf("(%T) could not get netmap: %w", s, err) + } + + builder := placement.NewNetworkMapBuilder(nm) + + objectID := addr.Object() + nodesVector, err := builder.BuildPlacement(addr.Container(), &objectID, cont.Value.PlacementPolicy()) + if err != nil { + return result, fmt.Errorf("(%T) could not build object placement: %w", s, err) + } + result.containerNodes = distinctBy(placement.FlattenNodes(nodesVector), func(n netmapSDK.NodeInfo) string { return hex.EncodeToString(n.PublicKey()) }) + + for _, node := range result.containerNodes { + if s.announcedKeys.IsLocalKey(node.PublicKey()) { + return result, nil + } + } + result.needToForward = true + return result, nil +} + +func (s *Service) forwardGetRangeHashRequest(ctx context.Context, req *objectV2.GetRangeHashRequest, params getRangeForwardParams) (*objectV2.GetRangeHashResponse, error) { + key, err := s.keyStorage.GetKey(nil) + if err != nil { + return nil, err + } + + metaHdr := new(session.RequestMetaHeader) + metaHdr.SetTTL(req.GetMetaHeader().GetTTL() - 1) + metaHdr.SetOrigin(req.GetMetaHeader()) + writeCurrentVersion(metaHdr) + req.SetMetaHeader(metaHdr) + + if err := signature.SignServiceMessage(key, req); err != nil { + return nil, err + } + + var firstErr error + for _, node := range params.containerNodes { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + var addrGr network.AddressGroup + if err := addrGr.FromIterator(network.NodeEndpointsIterator(node)); err != nil { + s.log.Warn(logs.GetSvcV2FailedToParseNodeEndpoints, zap.String("node_public_key", hex.EncodeToString(node.PublicKey()))) + continue + } + + var extAddr network.AddressGroup + if len(node.ExternalAddresses()) > 0 { + if err := extAddr.FromStringSlice(node.ExternalAddresses()); err != nil { + s.log.Warn(logs.GetSvcV2FailedToParseNodeExternalAddresses, zap.String("node_public_key", hex.EncodeToString(node.PublicKey()))) + continue + } + } + + var info clientCore.NodeInfo + clientCore.NodeInfoFromNetmapElement(&info, placement.NewNode(addrGr, extAddr, node.PublicKey())) + + resp, err := s.performGetRangeHashOnNode(ctx, req, info) + if err == nil { + return resp, nil + } + if firstErr == nil { + firstErr = err + } + s.log.Debug(logs.GetSvcV2FailedToGetRangeHashFromNode, + zap.String("node_public_key", hex.EncodeToString(node.PublicKey())), + zap.Stringer("address", params.address), + zap.Error(err)) + } + s.log.Debug(logs.GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes, zap.Stringer("address", params.address), zap.Error(firstErr)) + if firstErr != nil { + return nil, firstErr + } + return nil, new(apistatus.ObjectNotFound) +} + +func (s *Service) performGetRangeHashOnNode(ctx context.Context, req *objectV2.GetRangeHashRequest, info clientCore.NodeInfo) (*objectV2.GetRangeHashResponse, error) { + cl, err := s.clientSource.Get(info) + if err != nil { + return nil, err + } + + var firstErr error + var resp *objectV2.GetRangeHashResponse + info.AddressGroup().IterateAddresses(func(a network.Address) bool { + resp, err = s.performGetRangeHashOnAddress(ctx, req, cl, a) + if err != nil { + if firstErr == nil { + firstErr = err + } + return false + } + return true + }) + if firstErr != nil { + return nil, firstErr + } + if resp == nil { + return nil, new(apistatus.ObjectNotFound) + } + return resp, nil +} + +func (s *Service) performGetRangeHashOnAddress(ctx context.Context, req *objectV2.GetRangeHashRequest, cl clientCore.MultiAddressClient, + a network.Address, +) (*objectV2.GetRangeHashResponse, error) { + var resp *objectV2.GetRangeHashResponse + var rpcErr error + err := cl.RawForAddress(ctx, a, func(cli *rpcclient.Client) error { + resp, rpcErr = rpc.HashObjectRange(cli, req, rpcclient.WithContext(ctx)) + return rpcErr + }) + if err != nil { + return nil, err + } + return resp, err +} + +func distinctBy[T any, K comparable](source []T, keySelector func(v T) K) []T { + var result []T + dict := make(map[K]struct{}) + for _, v := range source { + key := keySelector(v) + if _, exists := dict[key]; !exists { + result = append(result, v) + dict[key] = struct{}{} + } + } + return result +} diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index d4bce178a..07fe9b42a 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -5,10 +5,15 @@ import ( "errors" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "go.uber.org/zap" ) // Service implements Get operation of Object service v2. @@ -19,10 +24,30 @@ type Service struct { // Option represents Service constructor option. type Option func(*cfg) +type epochSource interface { + Epoch() (uint64, error) +} + +type clientSource interface { + Get(info clientcore.NodeInfo) (clientcore.MultiAddressClient, error) +} + type cfg struct { svc *getsvc.Service keyStorage *objutil.KeyStorage + + epochSource epochSource + + clientSource clientSource + + netmapSource netmap.Source + + announcedKeys netmap.AnnouncedKeys + + contSource container.Source + + log *logger.Logger } // NewService constructs Service instance from provided options. @@ -76,21 +101,6 @@ func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetOb } } -// GetRangeHash calls internal service and returns v2 response. -func (s *Service) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) { - p, err := s.toHashRangePrm(req) - if err != nil { - return nil, err - } - - res, err := s.svc.GetRangeHash(ctx, *p) - if err != nil { - return nil, err - } - - return toHashResponse(req.GetBody().GetType(), res), nil -} - // Head serves ForstFS API v2 compatible HEAD requests. func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { resp := new(objectV2.HeadResponse) @@ -125,3 +135,39 @@ func WithKeyStorage(ks *objutil.KeyStorage) Option { c.keyStorage = ks } } + +func WithEpochSource(es epochSource) Option { + return func(c *cfg) { + c.epochSource = es + } +} + +func WithClientSource(cs clientSource) Option { + return func(c *cfg) { + c.clientSource = cs + } +} + +func WithNetmapSource(ns netmap.Source) Option { + return func(c *cfg) { + c.netmapSource = ns + } +} + +func WithNetmapAnnouncedKeys(ak netmap.AnnouncedKeys) Option { + return func(c *cfg) { + c.announcedKeys = ak + } +} + +func WithContainerSource(cs container.Source) Option { + return func(c *cfg) { + c.contSource = cs + } +} + +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get V2 service"))} + } +} diff --git a/pkg/services/object_manager/placement/traverser.go b/pkg/services/object_manager/placement/traverser.go index dc9ab5e7a..53da186e8 100644 --- a/pkg/services/object_manager/placement/traverser.go +++ b/pkg/services/object_manager/placement/traverser.go @@ -181,6 +181,15 @@ func (x Node) PublicKey() []byte { return x.key } +// NewNode creates new Node. +func NewNode(addresses network.AddressGroup, externalAddresses network.AddressGroup, key []byte) Node { + return Node{ + addresses: addresses, + externalAddresses: externalAddresses, + key: key, + } +} + // Next returns next unprocessed address of the object placement. // // Returns nil if no nodes left or traversal operation succeeded.