forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
2 changed files with 371 additions and 2 deletions
369
pkg/services/object/put/single.go
Normal file
369
pkg/services/object/put/single.go
Normal file
|
@ -0,0 +1,369 @@
|
|||
package putsvc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type putSingleRequestSigner struct {
|
||||
req *objectAPI.PutSingleRequest
|
||||
keyStorage *svcutil.KeyStorage
|
||||
signer *sync.Once
|
||||
}
|
||||
|
||||
func (s *putSingleRequestSigner) GetRequestWithSignedHeader() (*objectAPI.PutSingleRequest, error) {
|
||||
var resErr error
|
||||
s.signer.Do(func() {
|
||||
metaHdr := new(sessionV2.RequestMetaHeader)
|
||||
meta := s.req.GetMetaHeader()
|
||||
|
||||
metaHdr.SetTTL(meta.GetTTL() - 1)
|
||||
metaHdr.SetOrigin(meta)
|
||||
s.req.SetMetaHeader(metaHdr)
|
||||
|
||||
privateKey, err := s.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
resErr = err
|
||||
return
|
||||
}
|
||||
resErr = signature.SignServiceMessage(privateKey, s.req)
|
||||
})
|
||||
return s.req, resErr
|
||||
}
|
||||
|
||||
func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "putsvc.PutSingle")
|
||||
defer span.End()
|
||||
|
||||
obj := objectSDK.NewFromV2(req.GetBody().GetObject())
|
||||
|
||||
if err := s.validatePutSingle(ctx, obj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.saveToNodes(ctx, obj, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp := &objectAPI.PutSingleResponse{}
|
||||
resp.SetBody(&objectAPI.PutSingleResponseBody{})
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) error {
|
||||
if err := s.validarePutSingleSize(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.validatePutSingleChecksum(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.validatePutSingleObject(ctx, obj)
|
||||
}
|
||||
|
||||
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
|
||||
if uint64(len(obj.Payload())) != obj.PayloadSize() {
|
||||
return ErrWrongPayloadSize
|
||||
}
|
||||
|
||||
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
|
||||
if obj.PayloadSize() > maxAllowedSize {
|
||||
return ErrExceedingMaxSize
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
|
||||
cs, csSet := obj.PayloadChecksum()
|
||||
if !csSet {
|
||||
return errors.New("missing payload checksum")
|
||||
}
|
||||
|
||||
var hash hash.Hash
|
||||
|
||||
switch typ := cs.Type(); typ {
|
||||
default:
|
||||
return fmt.Errorf("unsupported payload checksum type %v", typ)
|
||||
case checksum.SHA256:
|
||||
hash = sha256.New()
|
||||
case checksum.TZ:
|
||||
hash = tz.New()
|
||||
}
|
||||
|
||||
if _, err := hash.Write(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not compute payload hash: %w", err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(hash.Sum(nil), cs.Value()) {
|
||||
return fmt.Errorf("incorrect payload checksum")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
|
||||
return fmt.Errorf("coult not validate object format: %w", err)
|
||||
}
|
||||
|
||||
_, err := s.fmtValidator.ValidateContent(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not validate payload content: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest) error {
|
||||
localOnly := req.GetMetaHeader().GetTTL() <= 1
|
||||
placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
traversal := &traversal{
|
||||
opts: placementOptions,
|
||||
extraBroadcastEnabled: len(obj.Children()) > 0 ||
|
||||
(!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)),
|
||||
mtx: sync.RWMutex{},
|
||||
mExclude: make(map[string]struct{}),
|
||||
}
|
||||
signer := &putSingleRequestSigner{
|
||||
req: req,
|
||||
keyStorage: s.keyStorage,
|
||||
signer: &sync.Once{},
|
||||
}
|
||||
return s.saveAccordingToPlacement(ctx, obj, signer, traversal)
|
||||
}
|
||||
|
||||
func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) {
|
||||
var result []placement.Option
|
||||
if len(copiesNumber) > 0 {
|
||||
result = append(result, placement.WithCopyNumbers(copiesNumber))
|
||||
}
|
||||
|
||||
cnrID, ok := obj.ContainerID()
|
||||
if !ok {
|
||||
return nil, errors.New("missing container ID")
|
||||
}
|
||||
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container by ID: %w", err)
|
||||
}
|
||||
result = append(result, placement.ForContainer(cnrInfo.Value))
|
||||
|
||||
objID, ok := obj.ID()
|
||||
if !ok {
|
||||
return nil, errors.New("missing object ID")
|
||||
}
|
||||
result = append(result, placement.ForObject(objID))
|
||||
|
||||
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get latest network map: %w", err)
|
||||
}
|
||||
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
||||
if localOnly {
|
||||
result = append(result, placement.SuccessAfter(1))
|
||||
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
||||
}
|
||||
result = append(result, placement.UseBuilder(builder))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner, traversal *traversal) error {
|
||||
traverser, err := placement.NewTraverser(traversal.opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create object placement traverser: %w", err)
|
||||
}
|
||||
|
||||
var resultError atomic.Value
|
||||
for {
|
||||
addrs := traverser.Next()
|
||||
if len(addrs) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, &resultError); stop {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !traverser.Success() {
|
||||
var err errIncompletePut
|
||||
err.singleErr, _ = resultError.Load().(error)
|
||||
return err
|
||||
}
|
||||
|
||||
if traversal.submitPrimaryPlacementFinish() {
|
||||
err = s.saveAccordingToPlacement(ctx, obj, signer, traversal)
|
||||
if err != nil {
|
||||
s.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) saveToPlacementNodes(ctx context.Context,
|
||||
obj *objectSDK.Object,
|
||||
signer *putSingleRequestSigner,
|
||||
traversal *traversal,
|
||||
traverser *placement.Traverser,
|
||||
nodeAddresses []placement.Node,
|
||||
resultError *atomic.Value,
|
||||
) bool {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for _, nodeAddress := range nodeAddresses {
|
||||
nodeAddress := nodeAddress
|
||||
if traversal.processed(nodeAddress) {
|
||||
continue
|
||||
}
|
||||
|
||||
local := false
|
||||
workerPool := s.remotePool
|
||||
if s.netmapKeys.IsLocalKey(nodeAddress.PublicKey()) {
|
||||
local = true
|
||||
workerPool = s.localPool
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
if err := workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer)
|
||||
|
||||
traversal.submitProcessed(nodeAddress)
|
||||
|
||||
if err != nil {
|
||||
resultError.Store(err)
|
||||
svcutil.LogServiceError(s.log, "PUT", nodeAddress.Addresses(), err)
|
||||
return
|
||||
}
|
||||
|
||||
traverser.SubmitSuccess()
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
svcutil.LogWorkerPoolError(s.log, "PUT", err)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object, signer *putSingleRequestSigner) error {
|
||||
if nodeDesc.local {
|
||||
return s.localStore.Put(ctx, obj)
|
||||
}
|
||||
|
||||
var info client.NodeInfo
|
||||
|
||||
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
|
||||
|
||||
c, err := s.clientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
return s.redirectPutSingleRequest(ctx, signer, obj, info, c)
|
||||
}
|
||||
|
||||
func (s *Service) redirectPutSingleRequest(ctx context.Context,
|
||||
signer *putSingleRequestSigner,
|
||||
obj *objectSDK.Object,
|
||||
info client.NodeInfo,
|
||||
c client.MultiAddressClient) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest")
|
||||
defer span.End()
|
||||
|
||||
var req *objectAPI.PutSingleRequest
|
||||
var firstErr error
|
||||
req, firstErr = signer.GetRequestWithSignedHeader()
|
||||
if firstErr != nil {
|
||||
return firstErr
|
||||
}
|
||||
|
||||
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest.IterateAddresses",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", addr.String()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
s.log.Warn("failed to redirect PutSingle request",
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", addr),
|
||||
zap.Stringer("object_id", objID),
|
||||
zap.Stringer("container_id", cnrID),
|
||||
)
|
||||
}
|
||||
|
||||
stop = err == nil
|
||||
if stop || firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}()
|
||||
|
||||
var resp *objectAPI.PutSingleResponse
|
||||
|
||||
err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error {
|
||||
var e error
|
||||
resp, e = rpc.PutSingleObject(cli, req, rawclient.WithContext(ctx))
|
||||
return e
|
||||
})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to execute request: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = internal.VerifyResponseKeyV2(info.PublicKey(), resp); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = signature.VerifyServiceMessage(resp)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
return
|
||||
})
|
||||
|
||||
return firstErr
|
||||
}
|
|
@ -49,8 +49,8 @@ func (s *Service) Put() (object.PutObjectStream, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) PutSingle(context.Context, *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
|
||||
return nil, fmt.Errorf("unimplemented") //TODO
|
||||
func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
|
||||
return s.svc.PutSingle(ctx, req)
|
||||
}
|
||||
|
||||
func WithInternalService(v *putsvc.Service) Option {
|
||||
|
|
Loading…
Reference in a new issue