frostfs-node/pkg/services/object/put/single.go

295 lines
8.3 KiB
Go
Raw Normal View History

package putsvc
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"hash"
"sync"
objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/tzhash/tz"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type putSingleRequestSigner struct {
req *objectAPI.PutSingleRequest
keyStorage *svcutil.KeyStorage
signer *sync.Once
}
func (s *putSingleRequestSigner) GetRequestWithSignedHeader() (*objectAPI.PutSingleRequest, error) {
var resErr error
s.signer.Do(func() {
metaHdr := new(sessionV2.RequestMetaHeader)
meta := s.req.GetMetaHeader()
metaHdr.SetTTL(meta.GetTTL() - 1)
metaHdr.SetOrigin(meta)
s.req.SetMetaHeader(metaHdr)
privateKey, err := s.keyStorage.GetKey(nil)
if err != nil {
resErr = err
return
}
resErr = signature.SignServiceMessage(privateKey, s.req)
})
return s.req, resErr
}
func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "putsvc.PutSingle")
defer span.End()
obj := objectSDK.NewFromV2(req.GetBody().GetObject())
meta, err := s.validatePutSingle(ctx, obj)
if err != nil {
return nil, err
}
if err := s.saveToNodes(ctx, obj, req, meta); err != nil {
return nil, err
}
resp := &objectAPI.PutSingleResponse{}
resp.SetBody(&objectAPI.PutSingleResponseBody{})
return resp, nil
}
func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
if err := s.validarePutSingleSize(obj); err != nil {
return object.ContentMeta{}, err
}
if err := s.validatePutSingleChecksum(obj); err != nil {
return object.ContentMeta{}, err
}
return s.validatePutSingleObject(ctx, obj)
}
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
if uint64(len(obj.Payload())) != obj.PayloadSize() {
return ErrWrongPayloadSize
}
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
if obj.PayloadSize() > maxAllowedSize {
return ErrExceedingMaxSize
}
return nil
}
func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
cs, csSet := obj.PayloadChecksum()
if !csSet {
return errors.New("missing payload checksum")
}
var hash hash.Hash
switch typ := cs.Type(); typ {
default:
return fmt.Errorf("unsupported payload checksum type %v", typ)
case checksum.SHA256:
hash = sha256.New()
case checksum.TZ:
hash = tz.New()
}
if _, err := hash.Write(obj.Payload()); err != nil {
return fmt.Errorf("could not compute payload hash: %w", err)
}
if !bytes.Equal(hash.Sum(nil), cs.Value()) {
return fmt.Errorf("incorrect payload checksum")
}
return nil
}
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
}
meta, err := s.fmtValidator.ValidateContent(obj)
if err != nil {
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
}
return meta, nil
}
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
localOnly := req.GetMetaHeader().GetTTL() <= 1
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
if err != nil {
return err
}
iter := s.cfg.newNodeIterator(placementOptions)
iter.extraBroadcastEnabled = len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock))
signer := &putSingleRequestSigner{
req: req,
keyStorage: s.keyStorage,
signer: &sync.Once{},
}
return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error {
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta)
})
}
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) {
var result []placement.Option
if len(copiesNumber) > 0 {
result = append(result, placement.WithCopyNumbers(copiesNumber))
}
cnrID, ok := obj.ContainerID()
if !ok {
return nil, errors.New("missing container ID")
}
cnrInfo, err := s.cnrSrc.Get(cnrID)
if err != nil {
return nil, fmt.Errorf("could not get container by ID: %w", err)
}
result = append(result, placement.ForContainer(cnrInfo.Value))
objID, ok := obj.ID()
if !ok {
return nil, errors.New("missing object ID")
}
result = append(result, placement.ForObject(objID))
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
if err != nil {
return nil, fmt.Errorf("could not get latest network map: %w", err)
}
builder := placement.NewNetworkMapBuilder(latestNetmap)
if localOnly {
result = append(result, placement.SuccessAfter(1))
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
}
result = append(result, placement.UseBuilder(builder))
return result, nil
}
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
signer *putSingleRequestSigner, meta object.ContentMeta) error {
if nodeDesc.local {
return s.saveLocal(ctx, obj, meta)
}
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
c, err := s.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
}
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
localTarget := &localTarget{
storage: s.localStore,
}
return localTarget.WriteObject(ctx, obj, meta)
}
func (s *Service) redirectPutSingleRequest(ctx context.Context,
signer *putSingleRequestSigner,
obj *objectSDK.Object,
info client.NodeInfo,
c client.MultiAddressClient) error {
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest")
defer span.End()
var req *objectAPI.PutSingleRequest
var firstErr error
req, firstErr = signer.GetRequestWithSignedHeader()
if firstErr != nil {
return firstErr
}
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest.IterateAddresses",
trace.WithAttributes(
attribute.String("address", addr.String()),
))
defer span.End()
var err error
defer func() {
if err != nil {
objID, _ := obj.ID()
cnrID, _ := obj.ContainerID()
s.log.Warn(logs.PutSingleRedirectFailure,
zap.Error(err),
zap.Stringer("address", addr),
zap.Stringer("object_id", objID),
zap.Stringer("container_id", cnrID),
)
}
stop = err == nil
if stop || firstErr == nil {
firstErr = err
}
}()
var resp *objectAPI.PutSingleResponse
err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error {
var e error
resp, e = rpc.PutSingleObject(cli, req, rawclient.WithContext(ctx))
return e
})
if err != nil {
err = fmt.Errorf("failed to execute request: %w", err)
return
}
if err = internal.VerifyResponseKeyV2(info.PublicKey(), resp); err != nil {
return
}
err = signature.VerifyServiceMessage(resp)
if err != nil {
err = fmt.Errorf("response verification failed: %w", err)
}
return
})
return firstErr
}