package putsvc import ( "bytes" "context" "crypto/sha256" "errors" "fmt" "hash" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectAPI "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc" rawclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/signature" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/tzhash/tz" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) var errInvalidPayloadChecksum = errors.New("incorrect payload checksum") type putSingleRequestSigner struct { req *objectAPI.PutSingleRequest keyStorage *svcutil.KeyStorage signer *sync.Once } func (s *putSingleRequestSigner) GetRequestWithSignedHeader() (*objectAPI.PutSingleRequest, error) { var resErr error s.signer.Do(func() { metaHdr := new(sessionV2.RequestMetaHeader) meta := s.req.GetMetaHeader() metaHdr.SetTTL(meta.GetTTL() - 1) metaHdr.SetOrigin(meta) s.req.SetMetaHeader(metaHdr) privateKey, err := s.keyStorage.GetKey(nil) if err != nil { resErr = err return } resErr = signature.SignServiceMessage(privateKey, s.req) }) return s.req, resErr } func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) { ctx, span := tracing.StartSpanFromContext(ctx, "putsvc.PutSingle") defer span.End() obj := objectSDK.NewFromV2(req.GetBody().GetObject()) meta, err := s.validatePutSingle(ctx, obj) if err != nil { return nil, err } if err := s.saveToNodes(ctx, obj, req, meta); err != nil { return nil, err } resp := &objectAPI.PutSingleResponse{} resp.SetBody(&objectAPI.PutSingleResponseBody{}) return resp, nil } func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) { if err := s.validarePutSingleSize(obj); err != nil { return object.ContentMeta{}, err } if err := s.validatePutSingleChecksum(obj); err != nil { return object.ContentMeta{}, err } return s.validatePutSingleObject(ctx, obj) } func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error { if uint64(len(obj.Payload())) != obj.PayloadSize() { return target.ErrWrongPayloadSize } maxAllowedSize := s.Config.MaxSizeSrc.MaxObjectSize() if obj.PayloadSize() > maxAllowedSize { return target.ErrExceedingMaxSize } return nil } func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error { cs, csSet := obj.PayloadChecksum() if !csSet { return errors.New("missing payload checksum") } var hash hash.Hash switch typ := cs.Type(); typ { default: return fmt.Errorf("unsupported payload checksum type %v", typ) case checksum.SHA256: hash = sha256.New() case checksum.TZ: hash = tz.New() } if _, err := hash.Write(obj.Payload()); err != nil { return fmt.Errorf("could not compute payload hash: %w", err) } if !bytes.Equal(hash.Sum(nil), cs.Value()) { return errInvalidPayloadChecksum } return nil } func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) { if err := s.FormatValidator.Validate(ctx, obj, false); err != nil { return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err) } meta, err := s.FormatValidator.ValidateContent(obj) if err != nil { return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err) } return meta, nil } func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { localOnly := req.GetMetaHeader().GetTTL() <= 1 placement, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly) if err != nil { return err } if placement.isEC { return s.saveToECReplicas(ctx, placement, obj, req, meta) } return s.saveToREPReplicas(ctx, placement, obj, localOnly, req, meta) } func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { iter := s.Config.NewNodeIterator(placement.placementOptions) iter.ExtraBroadcastEnabled = objectwriter.NeedAdditionalBroadcast(obj, localOnly) iter.ResetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast signer := &putSingleRequestSigner{ req: req, keyStorage: s.Config.KeyStorage, signer: &sync.Once{}, } return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error { return s.saveToPlacementNode(ctx, &nd, obj, signer, meta, placement.container) }) } func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { commonPrm, err := svcutil.CommonPrmFromV2(req) if err != nil { return err } key, err := s.Config.KeyStorage.GetKey(nil) if err != nil { return err } signer := &putSingleRequestSigner{ req: req, keyStorage: s.Config.KeyStorage, signer: &sync.Once{}, } w := objectwriter.ECWriter{ Config: s.Config, PlacementOpts: placement.placementOptions, ObjectMeta: meta, ObjectMetaValid: true, CommonPrm: commonPrm, Container: placement.container, Key: key, Relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error { return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac) }, } return w.WriteObject(ctx, obj) } type putSinglePlacement struct { placementOptions []placement.Option isEC bool container containerSDK.Container resetSuccessAfterOnBroadcast bool } func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) (putSinglePlacement, error) { var result putSinglePlacement cnrID, ok := obj.ContainerID() if !ok { return result, errors.New("missing container ID") } cnrInfo, err := s.Config.ContainerSource.Get(cnrID) if err != nil { return result, fmt.Errorf("could not get container by ID: %w", err) } result.container = cnrInfo.Value result.isEC = container.IsECContainer(cnrInfo.Value) && object.IsECSupported(obj) if len(copiesNumber) > 0 && !result.isEC { result.placementOptions = append(result.placementOptions, placement.WithCopyNumbers(copiesNumber)) } if container.IsECContainer(cnrInfo.Value) && !object.IsECSupported(obj) && !localOnly { result.placementOptions = append(result.placementOptions, placement.SuccessAfter(uint32(policy.ECParityCount(cnrInfo.Value.PlacementPolicy())+1))) result.resetSuccessAfterOnBroadcast = true } result.placementOptions = append(result.placementOptions, placement.ForContainer(cnrInfo.Value)) objID, ok := obj.ID() if !ok { return result, errors.New("missing object ID") } if obj.ECHeader() != nil { objID = obj.ECHeader().Parent() } result.placementOptions = append(result.placementOptions, placement.ForObject(objID)) latestNetmap, err := netmap.GetLatestNetworkMap(s.Config.NetmapSource) if err != nil { return result, fmt.Errorf("could not get latest network map: %w", err) } builder := placement.NewNetworkMapBuilder(latestNetmap) if localOnly { result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1)) builder = svcutil.NewLocalPlacement(builder, s.Config.NetmapKeys) } result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder)) return result, nil } func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object, signer *putSingleRequestSigner, meta object.ContentMeta, container containerSDK.Container, ) error { if nodeDesc.Local { return s.saveLocal(ctx, obj, meta, container) } var info client.NodeInfo client.NodeInfoFromNetmapElement(&info, nodeDesc.Info) c, err := s.Config.ClientConstructor.Get(info) if err != nil { return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } return s.redirectPutSingleRequest(ctx, signer, obj, info, c) } func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta, container containerSDK.Container) error { localTarget := &objectwriter.LocalTarget{ Storage: s.Config.LocalStore, Container: container, } return localTarget.WriteObject(ctx, obj, meta) } func (s *Service) redirectPutSingleRequest(ctx context.Context, signer *putSingleRequestSigner, obj *objectSDK.Object, info client.NodeInfo, c client.MultiAddressClient, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest") defer span.End() var req *objectAPI.PutSingleRequest var firstErr error req, firstErr = signer.GetRequestWithSignedHeader() if firstErr != nil { return firstErr } info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) { ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest.IterateAddresses", trace.WithAttributes( attribute.String("address", addr.String()))) defer span.End() var err error defer func() { if err != nil { objID, _ := obj.ID() cnrID, _ := obj.ContainerID() s.Config.Logger.Warn(ctx, logs.PutSingleRedirectFailure, zap.Error(err), zap.Stringer("address", addr), zap.Stringer("object_id", objID), zap.Stringer("container_id", cnrID), zap.String("trace_id", tracingPkg.GetTraceID(ctx)), ) } stop = err == nil if stop || firstErr == nil { firstErr = err } }() var resp *objectAPI.PutSingleResponse err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error { var e error resp, e = rpc.PutSingleObject(cli, req, rawclient.WithContext(ctx)) return e }) if err != nil { err = fmt.Errorf("failed to execute request: %w", err) return } if err = internal.VerifyResponseKeyV2(info.PublicKey(), resp); err != nil { return } err = signature.VerifyServiceMessage(resp) if err != nil { err = fmt.Errorf("response verification failed: %w", err) } return }) return firstErr }