frostfs-node/pkg/services/object/put/single.go
Anton Nikiforov 8d589314b5 [#560] node: Fix Put in multi REP with intersecting sets of nodes
Once the node was processed it skipped, at the step of forming
result in case when all nodes skipped, because processed for
previous REP, service mark the whole request as incomplete.

Example of policies which are unblocked:
- REP 1 REP 1 CBF 1
- REP 4 IN X REP 4 IN Y
  CBF 4
  SELECT 2 FROM FX AS X SELECT 2 FROM FY AS Y
  FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS FY
  FILTER Price GE 0 AS FX

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-08-08 10:22:53 +00:00

385 lines
10 KiB
Go

package putsvc
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"hash"
"sync"
"sync/atomic"
objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type putSingleRequestSigner struct {
req *objectAPI.PutSingleRequest
keyStorage *svcutil.KeyStorage
signer *sync.Once
}
func (s *putSingleRequestSigner) GetRequestWithSignedHeader() (*objectAPI.PutSingleRequest, error) {
var resErr error
s.signer.Do(func() {
metaHdr := new(sessionV2.RequestMetaHeader)
meta := s.req.GetMetaHeader()
metaHdr.SetTTL(meta.GetTTL() - 1)
metaHdr.SetOrigin(meta)
s.req.SetMetaHeader(metaHdr)
privateKey, err := s.keyStorage.GetKey(nil)
if err != nil {
resErr = err
return
}
resErr = signature.SignServiceMessage(privateKey, s.req)
})
return s.req, resErr
}
func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "putsvc.PutSingle")
defer span.End()
obj := objectSDK.NewFromV2(req.GetBody().GetObject())
meta, err := s.validatePutSingle(ctx, obj)
if err != nil {
return nil, err
}
if err := s.saveToNodes(ctx, obj, req, meta); err != nil {
return nil, err
}
resp := &objectAPI.PutSingleResponse{}
resp.SetBody(&objectAPI.PutSingleResponseBody{})
return resp, nil
}
func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
if err := s.validarePutSingleSize(obj); err != nil {
return object.ContentMeta{}, err
}
if err := s.validatePutSingleChecksum(obj); err != nil {
return object.ContentMeta{}, err
}
return s.validatePutSingleObject(ctx, obj)
}
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
if uint64(len(obj.Payload())) != obj.PayloadSize() {
return ErrWrongPayloadSize
}
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
if obj.PayloadSize() > maxAllowedSize {
return ErrExceedingMaxSize
}
return nil
}
func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
cs, csSet := obj.PayloadChecksum()
if !csSet {
return errors.New("missing payload checksum")
}
var hash hash.Hash
switch typ := cs.Type(); typ {
default:
return fmt.Errorf("unsupported payload checksum type %v", typ)
case checksum.SHA256:
hash = sha256.New()
case checksum.TZ:
hash = tz.New()
}
if _, err := hash.Write(obj.Payload()); err != nil {
return fmt.Errorf("could not compute payload hash: %w", err)
}
if !bytes.Equal(hash.Sum(nil), cs.Value()) {
return fmt.Errorf("incorrect payload checksum")
}
return nil
}
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
}
meta, err := s.fmtValidator.ValidateContent(obj)
if err != nil {
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
}
return meta, nil
}
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
localOnly := req.GetMetaHeader().GetTTL() <= 1
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
if err != nil {
return err
}
traversal := &traversal{
opts: placementOptions,
extraBroadcastEnabled: len(obj.Children()) > 0 ||
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
mExclude: make(map[string]*bool),
}
signer := &putSingleRequestSigner{
req: req,
keyStorage: s.keyStorage,
signer: &sync.Once{},
}
return s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta)
}
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) {
var result []placement.Option
if len(copiesNumber) > 0 {
result = append(result, placement.WithCopyNumbers(copiesNumber))
}
cnrID, ok := obj.ContainerID()
if !ok {
return nil, errors.New("missing container ID")
}
cnrInfo, err := s.cnrSrc.Get(cnrID)
if err != nil {
return nil, fmt.Errorf("could not get container by ID: %w", err)
}
result = append(result, placement.ForContainer(cnrInfo.Value))
objID, ok := obj.ID()
if !ok {
return nil, errors.New("missing object ID")
}
result = append(result, placement.ForObject(objID))
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
if err != nil {
return nil, fmt.Errorf("could not get latest network map: %w", err)
}
builder := placement.NewNetworkMapBuilder(latestNetmap)
if localOnly {
result = append(result, placement.SuccessAfter(1))
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
}
result = append(result, placement.UseBuilder(builder))
return result, nil
}
func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner,
traversal *traversal, meta object.ContentMeta) error {
traverser, err := placement.NewTraverser(traversal.opts...)
if err != nil {
return fmt.Errorf("could not create object placement traverser: %w", err)
}
var resultError atomic.Value
for {
addrs := traverser.Next()
if len(addrs) == 0 {
break
}
if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, meta, &resultError); stop {
break
}
}
if !traverser.Success() {
var err errIncompletePut
err.singleErr, _ = resultError.Load().(error)
return err
}
if traversal.submitPrimaryPlacementFinish() {
err = s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta)
if err != nil {
s.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
}
}
return nil
}
func (s *Service) saveToPlacementNodes(ctx context.Context,
obj *objectSDK.Object,
signer *putSingleRequestSigner,
traversal *traversal,
traverser *placement.Traverser,
nodeAddresses []placement.Node,
meta object.ContentMeta,
resultError *atomic.Value,
) bool {
wg := sync.WaitGroup{}
for _, nodeAddress := range nodeAddresses {
nodeAddress := nodeAddress
if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil {
if *ok {
traverser.SubmitSuccess()
}
continue
}
local := false
workerPool := s.remotePool
if s.netmapKeys.IsLocalKey(nodeAddress.PublicKey()) {
local = true
workerPool = s.localPool
}
item := new(bool)
wg.Add(1)
if err := workerPool.Submit(func() {
defer wg.Done()
err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer, meta)
if err != nil {
resultError.Store(err)
svcutil.LogServiceError(s.log, "PUT", nodeAddress.Addresses(), err)
return
}
traverser.SubmitSuccess()
*item = true
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(s.log, "PUT", err)
return true
}
traversal.submitProcessed(nodeAddress, item)
}
wg.Wait()
return false
}
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
signer *putSingleRequestSigner, meta object.ContentMeta) error {
if nodeDesc.local {
return s.saveLocal(ctx, obj, meta)
}
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
c, err := s.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
}
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
localTarget := &localTarget{
storage: s.localStore,
}
return localTarget.WriteObject(ctx, obj, meta)
}
func (s *Service) redirectPutSingleRequest(ctx context.Context,
signer *putSingleRequestSigner,
obj *objectSDK.Object,
info client.NodeInfo,
c client.MultiAddressClient) error {
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest")
defer span.End()
var req *objectAPI.PutSingleRequest
var firstErr error
req, firstErr = signer.GetRequestWithSignedHeader()
if firstErr != nil {
return firstErr
}
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest.IterateAddresses",
trace.WithAttributes(
attribute.String("address", addr.String()),
))
defer span.End()
var err error
defer func() {
if err != nil {
objID, _ := obj.ID()
cnrID, _ := obj.ContainerID()
s.log.Warn("failed to redirect PutSingle request",
zap.Error(err),
zap.Stringer("address", addr),
zap.Stringer("object_id", objID),
zap.Stringer("container_id", cnrID),
)
}
stop = err == nil
if stop || firstErr == nil {
firstErr = err
}
}()
var resp *objectAPI.PutSingleResponse
err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error {
var e error
resp, e = rpc.PutSingleObject(cli, req, rawclient.WithContext(ctx))
return e
})
if err != nil {
err = fmt.Errorf("failed to execute request: %w", err)
return
}
if err = internal.VerifyResponseKeyV2(info.PublicKey(), resp); err != nil {
return
}
err = signature.VerifyServiceMessage(resp)
if err != nil {
err = fmt.Errorf("response verification failed: %w", err)
}
return
})
return firstErr
}