Anton Nikiforov
8d589314b5
Once the node was processed it skipped, at the step of forming result in case when all nodes skipped, because processed for previous REP, service mark the whole request as incomplete. Example of policies which are unblocked: - REP 1 REP 1 CBF 1 - REP 4 IN X REP 4 IN Y CBF 4 SELECT 2 FROM FX AS X SELECT 2 FROM FY AS Y FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS FY FILTER Price GE 0 AS FX Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
385 lines
10 KiB
Go
385 lines
10 KiB
Go
package putsvc
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
|
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
|
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type putSingleRequestSigner struct {
|
|
req *objectAPI.PutSingleRequest
|
|
keyStorage *svcutil.KeyStorage
|
|
signer *sync.Once
|
|
}
|
|
|
|
func (s *putSingleRequestSigner) GetRequestWithSignedHeader() (*objectAPI.PutSingleRequest, error) {
|
|
var resErr error
|
|
s.signer.Do(func() {
|
|
metaHdr := new(sessionV2.RequestMetaHeader)
|
|
meta := s.req.GetMetaHeader()
|
|
|
|
metaHdr.SetTTL(meta.GetTTL() - 1)
|
|
metaHdr.SetOrigin(meta)
|
|
s.req.SetMetaHeader(metaHdr)
|
|
|
|
privateKey, err := s.keyStorage.GetKey(nil)
|
|
if err != nil {
|
|
resErr = err
|
|
return
|
|
}
|
|
resErr = signature.SignServiceMessage(privateKey, s.req)
|
|
})
|
|
return s.req, resErr
|
|
}
|
|
|
|
func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putsvc.PutSingle")
|
|
defer span.End()
|
|
|
|
obj := objectSDK.NewFromV2(req.GetBody().GetObject())
|
|
|
|
meta, err := s.validatePutSingle(ctx, obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.saveToNodes(ctx, obj, req, meta); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp := &objectAPI.PutSingleResponse{}
|
|
resp.SetBody(&objectAPI.PutSingleResponseBody{})
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
|
|
if err := s.validarePutSingleSize(obj); err != nil {
|
|
return object.ContentMeta{}, err
|
|
}
|
|
|
|
if err := s.validatePutSingleChecksum(obj); err != nil {
|
|
return object.ContentMeta{}, err
|
|
}
|
|
|
|
return s.validatePutSingleObject(ctx, obj)
|
|
}
|
|
|
|
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
|
|
if uint64(len(obj.Payload())) != obj.PayloadSize() {
|
|
return ErrWrongPayloadSize
|
|
}
|
|
|
|
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
|
|
if obj.PayloadSize() > maxAllowedSize {
|
|
return ErrExceedingMaxSize
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
|
|
cs, csSet := obj.PayloadChecksum()
|
|
if !csSet {
|
|
return errors.New("missing payload checksum")
|
|
}
|
|
|
|
var hash hash.Hash
|
|
|
|
switch typ := cs.Type(); typ {
|
|
default:
|
|
return fmt.Errorf("unsupported payload checksum type %v", typ)
|
|
case checksum.SHA256:
|
|
hash = sha256.New()
|
|
case checksum.TZ:
|
|
hash = tz.New()
|
|
}
|
|
|
|
if _, err := hash.Write(obj.Payload()); err != nil {
|
|
return fmt.Errorf("could not compute payload hash: %w", err)
|
|
}
|
|
|
|
if !bytes.Equal(hash.Sum(nil), cs.Value()) {
|
|
return fmt.Errorf("incorrect payload checksum")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
|
|
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
|
|
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
|
|
}
|
|
|
|
meta, err := s.fmtValidator.ValidateContent(obj)
|
|
if err != nil {
|
|
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
|
|
}
|
|
|
|
return meta, nil
|
|
}
|
|
|
|
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
|
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
|
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
traversal := &traversal{
|
|
opts: placementOptions,
|
|
extraBroadcastEnabled: len(obj.Children()) > 0 ||
|
|
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
|
|
mExclude: make(map[string]*bool),
|
|
}
|
|
signer := &putSingleRequestSigner{
|
|
req: req,
|
|
keyStorage: s.keyStorage,
|
|
signer: &sync.Once{},
|
|
}
|
|
return s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta)
|
|
}
|
|
|
|
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) {
|
|
var result []placement.Option
|
|
if len(copiesNumber) > 0 {
|
|
result = append(result, placement.WithCopyNumbers(copiesNumber))
|
|
}
|
|
|
|
cnrID, ok := obj.ContainerID()
|
|
if !ok {
|
|
return nil, errors.New("missing container ID")
|
|
}
|
|
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not get container by ID: %w", err)
|
|
}
|
|
result = append(result, placement.ForContainer(cnrInfo.Value))
|
|
|
|
objID, ok := obj.ID()
|
|
if !ok {
|
|
return nil, errors.New("missing object ID")
|
|
}
|
|
result = append(result, placement.ForObject(objID))
|
|
|
|
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not get latest network map: %w", err)
|
|
}
|
|
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
|
if localOnly {
|
|
result = append(result, placement.SuccessAfter(1))
|
|
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
|
}
|
|
result = append(result, placement.UseBuilder(builder))
|
|
return result, nil
|
|
}
|
|
|
|
func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner,
|
|
traversal *traversal, meta object.ContentMeta) error {
|
|
traverser, err := placement.NewTraverser(traversal.opts...)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create object placement traverser: %w", err)
|
|
}
|
|
|
|
var resultError atomic.Value
|
|
for {
|
|
addrs := traverser.Next()
|
|
if len(addrs) == 0 {
|
|
break
|
|
}
|
|
|
|
if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, meta, &resultError); stop {
|
|
break
|
|
}
|
|
}
|
|
|
|
if !traverser.Success() {
|
|
var err errIncompletePut
|
|
err.singleErr, _ = resultError.Load().(error)
|
|
return err
|
|
}
|
|
|
|
if traversal.submitPrimaryPlacementFinish() {
|
|
err = s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta)
|
|
if err != nil {
|
|
s.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) saveToPlacementNodes(ctx context.Context,
|
|
obj *objectSDK.Object,
|
|
signer *putSingleRequestSigner,
|
|
traversal *traversal,
|
|
traverser *placement.Traverser,
|
|
nodeAddresses []placement.Node,
|
|
meta object.ContentMeta,
|
|
resultError *atomic.Value,
|
|
) bool {
|
|
wg := sync.WaitGroup{}
|
|
|
|
for _, nodeAddress := range nodeAddresses {
|
|
nodeAddress := nodeAddress
|
|
if ok := traversal.mExclude[string(nodeAddress.PublicKey())]; ok != nil {
|
|
if *ok {
|
|
traverser.SubmitSuccess()
|
|
}
|
|
continue
|
|
}
|
|
|
|
local := false
|
|
workerPool := s.remotePool
|
|
if s.netmapKeys.IsLocalKey(nodeAddress.PublicKey()) {
|
|
local = true
|
|
workerPool = s.localPool
|
|
}
|
|
|
|
item := new(bool)
|
|
wg.Add(1)
|
|
if err := workerPool.Submit(func() {
|
|
defer wg.Done()
|
|
|
|
err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer, meta)
|
|
|
|
if err != nil {
|
|
resultError.Store(err)
|
|
svcutil.LogServiceError(s.log, "PUT", nodeAddress.Addresses(), err)
|
|
return
|
|
}
|
|
|
|
traverser.SubmitSuccess()
|
|
*item = true
|
|
}); err != nil {
|
|
wg.Done()
|
|
svcutil.LogWorkerPoolError(s.log, "PUT", err)
|
|
return true
|
|
}
|
|
|
|
traversal.submitProcessed(nodeAddress, item)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return false
|
|
}
|
|
|
|
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
|
|
signer *putSingleRequestSigner, meta object.ContentMeta) error {
|
|
if nodeDesc.local {
|
|
return s.saveLocal(ctx, obj, meta)
|
|
}
|
|
|
|
var info client.NodeInfo
|
|
|
|
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
|
|
|
|
c, err := s.clientConstructor.Get(info)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
|
}
|
|
|
|
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
|
|
}
|
|
|
|
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
|
|
localTarget := &localTarget{
|
|
storage: s.localStore,
|
|
}
|
|
return localTarget.WriteObject(ctx, obj, meta)
|
|
}
|
|
|
|
func (s *Service) redirectPutSingleRequest(ctx context.Context,
|
|
signer *putSingleRequestSigner,
|
|
obj *objectSDK.Object,
|
|
info client.NodeInfo,
|
|
c client.MultiAddressClient) error {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest")
|
|
defer span.End()
|
|
|
|
var req *objectAPI.PutSingleRequest
|
|
var firstErr error
|
|
req, firstErr = signer.GetRequestWithSignedHeader()
|
|
if firstErr != nil {
|
|
return firstErr
|
|
}
|
|
|
|
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest.IterateAddresses",
|
|
trace.WithAttributes(
|
|
attribute.String("address", addr.String()),
|
|
))
|
|
defer span.End()
|
|
|
|
var err error
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
objID, _ := obj.ID()
|
|
cnrID, _ := obj.ContainerID()
|
|
s.log.Warn("failed to redirect PutSingle request",
|
|
zap.Error(err),
|
|
zap.Stringer("address", addr),
|
|
zap.Stringer("object_id", objID),
|
|
zap.Stringer("container_id", cnrID),
|
|
)
|
|
}
|
|
|
|
stop = err == nil
|
|
if stop || firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}()
|
|
|
|
var resp *objectAPI.PutSingleResponse
|
|
|
|
err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error {
|
|
var e error
|
|
resp, e = rpc.PutSingleObject(cli, req, rawclient.WithContext(ctx))
|
|
return e
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to execute request: %w", err)
|
|
return
|
|
}
|
|
|
|
if err = internal.VerifyResponseKeyV2(info.PublicKey(), resp); err != nil {
|
|
return
|
|
}
|
|
|
|
err = signature.VerifyServiceMessage(resp)
|
|
if err != nil {
|
|
err = fmt.Errorf("response verification failed: %w", err)
|
|
}
|
|
|
|
return
|
|
})
|
|
|
|
return firstErr
|
|
}
|