diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index 200830e151..b5a88c5020 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -18,6 +18,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -63,11 +64,12 @@ func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest obj := objectSDK.NewFromV2(req.GetBody().GetObject()) - if err := s.validatePutSingle(ctx, obj); err != nil { + meta, err := s.validatePutSingle(ctx, obj) + if err != nil { return nil, err } - if err := s.saveToNodes(ctx, obj, req); err != nil { + if err := s.saveToNodes(ctx, obj, req, meta); err != nil { return nil, err } @@ -76,13 +78,13 @@ func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest return resp, nil } -func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) error { +func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) { if err := s.validarePutSingleSize(obj); err != nil { - return err + return object.ContentMeta{}, err } if err := s.validatePutSingleChecksum(obj); err != nil { - return err + return object.ContentMeta{}, err } return s.validatePutSingleObject(ctx, obj) @@ -129,20 +131,20 @@ func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error { return nil } -func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) error { +func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) { if err := s.fmtValidator.Validate(ctx, obj, false); err != nil { - return fmt.Errorf("coud not validate object format: %w", err) + return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err) } - _, err := s.fmtValidator.ValidateContent(obj) + meta, err := s.fmtValidator.ValidateContent(obj) if err != nil { - return fmt.Errorf("could not validate payload content: %w", err) + return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err) } - return nil + return meta, nil } -func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest) error { +func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { localOnly := req.GetMetaHeader().GetTTL() <= 1 placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly) if err != nil { @@ -160,7 +162,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o keyStorage: s.keyStorage, signer: &sync.Once{}, } - return s.saveAccordingToPlacement(ctx, obj, signer, traversal) + return s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta) } func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) { @@ -198,7 +200,8 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb return result, nil } -func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner, traversal *traversal) error { +func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner, + traversal *traversal, meta object.ContentMeta) error { traverser, err := placement.NewTraverser(traversal.opts...) if err != nil { return fmt.Errorf("could not create object placement traverser: %w", err) @@ -211,7 +214,7 @@ func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.O break } - if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, &resultError); stop { + if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, meta, &resultError); stop { break } } @@ -223,7 +226,7 @@ func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.O } if traversal.submitPrimaryPlacementFinish() { - err = s.saveAccordingToPlacement(ctx, obj, signer, traversal) + err = s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta) if err != nil { s.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err)) } @@ -238,6 +241,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context, traversal *traversal, traverser *placement.Traverser, nodeAddresses []placement.Node, + meta object.ContentMeta, resultError *atomic.Value, ) bool { wg := sync.WaitGroup{} @@ -259,7 +263,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context, if err := workerPool.Submit(func() { defer wg.Done() - err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer) + err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer, meta) traversal.submitProcessed(nodeAddress) @@ -282,9 +286,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context, return false } -func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object, signer *putSingleRequestSigner) error { +func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object, + signer *putSingleRequestSigner, meta object.ContentMeta) error { if nodeDesc.local { - return s.localStore.Put(ctx, obj) + return s.saveLocal(ctx, obj, meta) } var info client.NodeInfo @@ -299,6 +304,13 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, o return s.redirectPutSingleRequest(ctx, signer, obj, info, c) } +func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error { + localTarget := &localTarget{ + storage: s.localStore, + } + return localTarget.WriteObject(ctx, obj, meta) +} + func (s *Service) redirectPutSingleRequest(ctx context.Context, signer *putSingleRequestSigner, obj *objectSDK.Object,