357 lines
11 KiB
Go
357 lines
11 KiB
Go
package putsvc
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"sync"
|
|
|
|
objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
|
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
|
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
|
|
|
type putSingleRequestSigner struct {
|
|
req *objectAPI.PutSingleRequest
|
|
keyStorage *svcutil.KeyStorage
|
|
signer *sync.Once
|
|
}
|
|
|
|
func (s *putSingleRequestSigner) GetRequestWithSignedHeader() (*objectAPI.PutSingleRequest, error) {
|
|
var resErr error
|
|
s.signer.Do(func() {
|
|
metaHdr := new(sessionV2.RequestMetaHeader)
|
|
meta := s.req.GetMetaHeader()
|
|
|
|
metaHdr.SetTTL(meta.GetTTL() - 1)
|
|
metaHdr.SetOrigin(meta)
|
|
s.req.SetMetaHeader(metaHdr)
|
|
|
|
privateKey, err := s.keyStorage.GetKey(nil)
|
|
if err != nil {
|
|
resErr = err
|
|
return
|
|
}
|
|
resErr = signature.SignServiceMessage(privateKey, s.req)
|
|
})
|
|
return s.req, resErr
|
|
}
|
|
|
|
func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putsvc.PutSingle")
|
|
defer span.End()
|
|
|
|
obj := objectSDK.NewFromV2(req.GetBody().GetObject())
|
|
|
|
meta, err := s.validatePutSingle(ctx, obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.saveToNodes(ctx, obj, req, meta); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp := &objectAPI.PutSingleResponse{}
|
|
resp.SetBody(&objectAPI.PutSingleResponseBody{})
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
|
|
if err := s.validarePutSingleSize(obj); err != nil {
|
|
return object.ContentMeta{}, err
|
|
}
|
|
|
|
if err := s.validatePutSingleChecksum(obj); err != nil {
|
|
return object.ContentMeta{}, err
|
|
}
|
|
|
|
return s.validatePutSingleObject(ctx, obj)
|
|
}
|
|
|
|
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
|
|
if uint64(len(obj.Payload())) != obj.PayloadSize() {
|
|
return ErrWrongPayloadSize
|
|
}
|
|
|
|
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
|
|
if obj.PayloadSize() > maxAllowedSize {
|
|
return ErrExceedingMaxSize
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
|
|
cs, csSet := obj.PayloadChecksum()
|
|
if !csSet {
|
|
return errors.New("missing payload checksum")
|
|
}
|
|
|
|
var hash hash.Hash
|
|
|
|
switch typ := cs.Type(); typ {
|
|
default:
|
|
return fmt.Errorf("unsupported payload checksum type %v", typ)
|
|
case checksum.SHA256:
|
|
hash = sha256.New()
|
|
case checksum.TZ:
|
|
hash = tz.New()
|
|
}
|
|
|
|
if _, err := hash.Write(obj.Payload()); err != nil {
|
|
return fmt.Errorf("could not compute payload hash: %w", err)
|
|
}
|
|
|
|
if !bytes.Equal(hash.Sum(nil), cs.Value()) {
|
|
return errInvalidPayloadChecksum
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
|
|
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
|
|
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
|
|
}
|
|
|
|
meta, err := s.fmtValidator.ValidateContent(obj)
|
|
if err != nil {
|
|
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
|
|
}
|
|
|
|
return meta, nil
|
|
}
|
|
|
|
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
|
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
|
placement, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if placement.isEC {
|
|
return s.saveToECReplicas(ctx, placement, obj, req, meta)
|
|
}
|
|
|
|
return s.saveToREPReplicas(ctx, placement, obj, localOnly, req, meta)
|
|
}
|
|
|
|
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
|
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
|
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
|
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
|
|
|
|
signer := &putSingleRequestSigner{
|
|
req: req,
|
|
keyStorage: s.keyStorage,
|
|
signer: &sync.Once{},
|
|
}
|
|
|
|
return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error {
|
|
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta)
|
|
})
|
|
}
|
|
|
|
func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
|
commonPrm, err := svcutil.CommonPrmFromV2(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key, err := s.cfg.keyStorage.GetKey(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
signer := &putSingleRequestSigner{
|
|
req: req,
|
|
keyStorage: s.keyStorage,
|
|
signer: &sync.Once{},
|
|
}
|
|
|
|
w := ecWriter{
|
|
cfg: s.cfg,
|
|
placementOpts: placement.placementOptions,
|
|
objMeta: meta,
|
|
objMetaValid: true,
|
|
commonPrm: commonPrm,
|
|
container: placement.container,
|
|
key: key,
|
|
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
|
|
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
|
|
},
|
|
}
|
|
return w.WriteObject(ctx, obj)
|
|
}
|
|
|
|
type putSinglePlacement struct {
|
|
placementOptions []placement.Option
|
|
isEC bool
|
|
container containerSDK.Container
|
|
resetSuccessAfterOnBroadcast bool
|
|
}
|
|
|
|
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) {
|
|
var result putSinglePlacement
|
|
|
|
cnrID, ok := obj.ContainerID()
|
|
if !ok {
|
|
return result, errors.New("missing container ID")
|
|
}
|
|
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
|
if err != nil {
|
|
return result, fmt.Errorf("could not get container by ID: %w", err)
|
|
}
|
|
result.container = cnrInfo.Value
|
|
result.isEC = container.IsECContainer(cnrInfo.Value) && object.IsECSupported(obj)
|
|
if len(copiesNumber) > 0 && !result.isEC {
|
|
result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber))
|
|
}
|
|
if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly {
|
|
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1)))
|
|
result.resetSuccessAfterOnBroadcast = true
|
|
}
|
|
result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value))
|
|
|
|
objID, ok := obj.ID()
|
|
if !ok {
|
|
return result, errors.New("missing object ID")
|
|
}
|
|
if obj.ECHeader() != nil {
|
|
objID = obj.ECHeader().Parent()
|
|
}
|
|
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
|
|
|
|
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
|
if err != nil {
|
|
return result, fmt.Errorf("could not get latest network map: %w", err)
|
|
}
|
|
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
|
if localOnly {
|
|
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
|
|
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
|
}
|
|
result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
|
|
return result, nil
|
|
}
|
|
|
|
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
|
|
signer *putSingleRequestSigner, meta object.ContentMeta,
|
|
) error {
|
|
if nodeDesc.local {
|
|
return s.saveLocal(ctx, obj, meta)
|
|
}
|
|
|
|
var info client.NodeInfo
|
|
|
|
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
|
|
|
|
c, err := s.clientConstructor.Get(info)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
|
}
|
|
|
|
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
|
|
}
|
|
|
|
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
|
|
localTarget := &localTarget{
|
|
storage: s.localStore,
|
|
}
|
|
return localTarget.WriteObject(ctx, obj, meta)
|
|
}
|
|
|
|
func (s *Service) redirectPutSingleRequest(ctx context.Context,
|
|
signer *putSingleRequestSigner,
|
|
obj *objectSDK.Object,
|
|
info client.NodeInfo,
|
|
c client.MultiAddressClient,
|
|
) error {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest")
|
|
defer span.End()
|
|
|
|
var req *objectAPI.PutSingleRequest
|
|
var firstErr error
|
|
req, firstErr = signer.GetRequestWithSignedHeader()
|
|
if firstErr != nil {
|
|
return firstErr
|
|
}
|
|
|
|
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest.IterateAddresses",
|
|
trace.WithAttributes(
|
|
attribute.String("address", addr.String())))
|
|
defer span.End()
|
|
|
|
var err error
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
objID, _ := obj.ID()
|
|
cnrID, _ := obj.ContainerID()
|
|
s.log.Warn(logs.PutSingleRedirectFailure,
|
|
zap.Error(err),
|
|
zap.Stringer("address", addr),
|
|
zap.Stringer("object_id", objID),
|
|
zap.Stringer("container_id", cnrID),
|
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
|
)
|
|
}
|
|
|
|
stop = err == nil
|
|
if stop || firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}()
|
|
|
|
var resp *objectAPI.PutSingleResponse
|
|
|
|
err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error {
|
|
var e error
|
|
resp, e = rpc.PutSingleObject(cli, req, rawclient.WithContext(ctx))
|
|
return e
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to execute request: %w", err)
|
|
return
|
|
}
|
|
|
|
if err = internal.VerifyResponseKeyV2(info.PublicKey(), resp); err != nil {
|
|
return
|
|
}
|
|
|
|
err = signature.VerifyServiceMessage(resp)
|
|
if err != nil {
|
|
err = fmt.Errorf("response verification failed: %w", err)
|
|
}
|
|
|
|
return
|
|
})
|
|
|
|
return firstErr
|
|
}
|