Fix PutSingle implementation #546
1 changed files with 30 additions and 18 deletions
|
@ -18,6 +18,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||||
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -63,11 +64,12 @@ func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest
|
||||||
|
|
||||||
obj := objectSDK.NewFromV2(req.GetBody().GetObject())
|
obj := objectSDK.NewFromV2(req.GetBody().GetObject())
|
||||||
|
|
||||||
if err := s.validatePutSingle(ctx, obj); err != nil {
|
meta, err := s.validatePutSingle(ctx, obj)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.saveToNodes(ctx, obj, req); err != nil {
|
if err := s.saveToNodes(ctx, obj, req, meta); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,13 +78,13 @@ func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) error {
|
func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
|
||||||
if err := s.validarePutSingleSize(obj); err != nil {
|
if err := s.validarePutSingleSize(obj); err != nil {
|
||||||
return err
|
return object.ContentMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.validatePutSingleChecksum(obj); err != nil {
|
if err := s.validatePutSingleChecksum(obj); err != nil {
|
||||||
return err
|
return object.ContentMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.validatePutSingleObject(ctx, obj)
|
return s.validatePutSingleObject(ctx, obj)
|
||||||
|
@ -129,20 +131,20 @@ func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) error {
|
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
|
||||||
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
|
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
|
||||||
return fmt.Errorf("coud not validate object format: %w", err)
|
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := s.fmtValidator.ValidateContent(obj)
|
meta, err := s.fmtValidator.ValidateContent(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not validate payload content: %w", err)
|
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return meta, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest) error {
|
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||||
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
||||||
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -160,7 +162,7 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
||||||
keyStorage: s.keyStorage,
|
keyStorage: s.keyStorage,
|
||||||
signer: &sync.Once{},
|
signer: &sync.Once{},
|
||||||
}
|
}
|
||||||
return s.saveAccordingToPlacement(ctx, obj, signer, traversal)
|
return s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) {
|
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) {
|
||||||
|
@ -198,7 +200,8 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner, traversal *traversal) error {
|
func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner,
|
||||||
|
traversal *traversal, meta object.ContentMeta) error {
|
||||||
traverser, err := placement.NewTraverser(traversal.opts...)
|
traverser, err := placement.NewTraverser(traversal.opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create object placement traverser: %w", err)
|
return fmt.Errorf("could not create object placement traverser: %w", err)
|
||||||
|
@ -211,7 +214,7 @@ func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.O
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, &resultError); stop {
|
if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, meta, &resultError); stop {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -223,7 +226,7 @@ func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.O
|
||||||
}
|
}
|
||||||
|
|
||||||
if traversal.submitPrimaryPlacementFinish() {
|
if traversal.submitPrimaryPlacementFinish() {
|
||||||
err = s.saveAccordingToPlacement(ctx, obj, signer, traversal)
|
err = s.saveAccordingToPlacement(ctx, obj, signer, traversal, meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
s.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -238,6 +241,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
||||||
traversal *traversal,
|
traversal *traversal,
|
||||||
traverser *placement.Traverser,
|
traverser *placement.Traverser,
|
||||||
nodeAddresses []placement.Node,
|
nodeAddresses []placement.Node,
|
||||||
|
meta object.ContentMeta,
|
||||||
resultError *atomic.Value,
|
resultError *atomic.Value,
|
||||||
) bool {
|
) bool {
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
@ -259,7 +263,7 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
||||||
if err := workerPool.Submit(func() {
|
if err := workerPool.Submit(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer)
|
err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer, meta)
|
||||||
|
|
||||||
traversal.submitProcessed(nodeAddress)
|
traversal.submitProcessed(nodeAddress)
|
||||||
|
|
||||||
|
@ -282,9 +286,10 @@ func (s *Service) saveToPlacementNodes(ctx context.Context,
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object, signer *putSingleRequestSigner) error {
|
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
|
||||||
|
signer *putSingleRequestSigner, meta object.ContentMeta) error {
|
||||||
if nodeDesc.local {
|
if nodeDesc.local {
|
||||||
return s.localStore.Put(ctx, obj)
|
return s.saveLocal(ctx, obj, meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
|
@ -299,6 +304,13 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, o
|
||||||
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
|
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
|
||||||
|
localTarget := &localTarget{
|
||||||
|
storage: s.localStore,
|
||||||
|
}
|
||||||
|
return localTarget.WriteObject(ctx, obj, meta)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) redirectPutSingleRequest(ctx context.Context,
|
func (s *Service) redirectPutSingleRequest(ctx context.Context,
|
||||||
signer *putSingleRequestSigner,
|
signer *putSingleRequestSigner,
|
||||||
obj *objectSDK.Object,
|
obj *objectSDK.Object,
|
||||||
|
|
Loading…
Reference in a new issue