From fcbf90d31b7c932fda251fc4d186c07b7ccf1aba Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 4 Jul 2023 15:12:59 +0300 Subject: [PATCH] [#486] node: Add PutSingle implemetation Signed-off-by: Dmitrii Stepanov --- pkg/services/object/put/single.go | 369 ++++++++++++++++++++++++++ pkg/services/object/put/v2/service.go | 4 +- 2 files changed, 371 insertions(+), 2 deletions(-) create mode 100644 pkg/services/object/put/single.go diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go new file mode 100644 index 000000000..32c1ca73a --- /dev/null +++ b/pkg/services/object/put/single.go @@ -0,0 +1,369 @@ +package putsvc + +import ( + "bytes" + "context" + "crypto/sha256" + "errors" + "fmt" + "hash" + "sync" + "sync/atomic" + + objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" + rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" + svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/tzhash/tz" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +type putSingleRequestSigner struct { + req *objectAPI.PutSingleRequest + keyStorage *svcutil.KeyStorage + signer *sync.Once +} + +func (s *putSingleRequestSigner) GetRequestWithSignedHeader() (*objectAPI.PutSingleRequest, error) { + var resErr error + s.signer.Do(func() { + metaHdr := new(sessionV2.RequestMetaHeader) + meta := s.req.GetMetaHeader() + + metaHdr.SetTTL(meta.GetTTL() - 1) + metaHdr.SetOrigin(meta) + s.req.SetMetaHeader(metaHdr) + + privateKey, err := s.keyStorage.GetKey(nil) + if err != nil { + resErr = err + return + } + resErr = signature.SignServiceMessage(privateKey, s.req) + }) + return s.req, resErr +} + +func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "putsvc.PutSingle") + defer span.End() + + obj := objectSDK.NewFromV2(req.GetBody().GetObject()) + + if err := s.validatePutSingle(ctx, obj); err != nil { + return nil, err + } + + if err := s.saveToNodes(ctx, obj, req); err != nil { + return nil, err + } + + resp := &objectAPI.PutSingleResponse{} + resp.SetBody(&objectAPI.PutSingleResponseBody{}) + return resp, nil +} + +func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) error { + if err := s.validarePutSingleSize(obj); err != nil { + return err + } + + if err := s.validatePutSingleChecksum(obj); err != nil { + return err + } + + return s.validatePutSingleObject(ctx, obj) +} + +func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error { + if uint64(len(obj.Payload())) != obj.PayloadSize() { + return ErrWrongPayloadSize + } + + maxAllowedSize := s.maxSizeSrc.MaxObjectSize() + if obj.PayloadSize() > maxAllowedSize { + return ErrExceedingMaxSize + } + + return nil +} + +func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error { + cs, csSet := obj.PayloadChecksum() + if !csSet { + return errors.New("missing payload checksum") + } + + var hash hash.Hash + + switch typ := cs.Type(); typ { + default: + return fmt.Errorf("unsupported payload checksum type %v", typ) + case checksum.SHA256: + hash = sha256.New() + case checksum.TZ: + hash = tz.New() + } + + if _, err := hash.Write(obj.Payload()); err != nil { + return fmt.Errorf("could not compute payload hash: %w", err) + } + + if !bytes.Equal(hash.Sum(nil), cs.Value()) { + return fmt.Errorf("incorrect payload checksum") + } + + return nil +} + +func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) error { + if err := s.fmtValidator.Validate(ctx, obj, false); err != nil { + return fmt.Errorf("coult not validate object format: %w", err) + } + + _, err := s.fmtValidator.ValidateContent(obj) + if err != nil { + return fmt.Errorf("could not validate payload content: %w", err) + } + + return nil +} + +func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *objectAPI.PutSingleRequest) error { + localOnly := req.GetMetaHeader().GetTTL() <= 1 + placementOptions, err := s.getPutSinglePlacementOptions(obj, req.GetBody().GetCopiesNumber(), localOnly) + if err != nil { + return err + } + traversal := &traversal{ + opts: placementOptions, + extraBroadcastEnabled: len(obj.Children()) > 0 || + (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)), + mtx: sync.RWMutex{}, + mExclude: make(map[string]struct{}), + } + signer := &putSingleRequestSigner{ + req: req, + keyStorage: s.keyStorage, + signer: &sync.Once{}, + } + return s.saveAccordingToPlacement(ctx, obj, signer, traversal) +} + +func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumber []uint32, localOnly bool) ([]placement.Option, error) { + var result []placement.Option + if len(copiesNumber) > 0 { + result = append(result, placement.WithCopyNumbers(copiesNumber)) + } + + cnrID, ok := obj.ContainerID() + if !ok { + return nil, errors.New("missing container ID") + } + cnrInfo, err := s.cnrSrc.Get(cnrID) + if err != nil { + return nil, fmt.Errorf("could not get container by ID: %w", err) + } + result = append(result, placement.ForContainer(cnrInfo.Value)) + + objID, ok := obj.ID() + if !ok { + return nil, errors.New("missing object ID") + } + result = append(result, placement.ForObject(objID)) + + latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc) + if err != nil { + return nil, fmt.Errorf("could not get latest network map: %w", err) + } + builder := placement.NewNetworkMapBuilder(latestNetmap) + if localOnly { + result = append(result, placement.SuccessAfter(1)) + builder = svcutil.NewLocalPlacement(builder, s.netmapKeys) + } + result = append(result, placement.UseBuilder(builder)) + return result, nil +} + +func (s *Service) saveAccordingToPlacement(ctx context.Context, obj *objectSDK.Object, signer *putSingleRequestSigner, traversal *traversal) error { + traverser, err := placement.NewTraverser(traversal.opts...) + if err != nil { + return fmt.Errorf("could not create object placement traverser: %w", err) + } + + var resultError atomic.Value + for { + addrs := traverser.Next() + if len(addrs) == 0 { + break + } + + if stop := s.saveToPlacementNodes(ctx, obj, signer, traversal, traverser, addrs, &resultError); stop { + break + } + } + + if !traverser.Success() { + var err errIncompletePut + err.singleErr, _ = resultError.Load().(error) + return err + } + + if traversal.submitPrimaryPlacementFinish() { + err = s.saveAccordingToPlacement(ctx, obj, signer, traversal) + if err != nil { + s.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err)) + } + } + + return nil +} + +func (s *Service) saveToPlacementNodes(ctx context.Context, + obj *objectSDK.Object, + signer *putSingleRequestSigner, + traversal *traversal, + traverser *placement.Traverser, + nodeAddresses []placement.Node, + resultError *atomic.Value, +) bool { + wg := sync.WaitGroup{} + + for _, nodeAddress := range nodeAddresses { + nodeAddress := nodeAddress + if traversal.processed(nodeAddress) { + continue + } + + local := false + workerPool := s.remotePool + if s.netmapKeys.IsLocalKey(nodeAddress.PublicKey()) { + local = true + workerPool = s.localPool + } + + wg.Add(1) + if err := workerPool.Submit(func() { + defer wg.Done() + + err := s.saveToPlacementNode(ctx, &nodeDesc{local: local, info: nodeAddress}, obj, signer) + + traversal.submitProcessed(nodeAddress) + + if err != nil { + resultError.Store(err) + svcutil.LogServiceError(s.log, "PUT", nodeAddress.Addresses(), err) + return + } + + traverser.SubmitSuccess() + }); err != nil { + wg.Done() + svcutil.LogWorkerPoolError(s.log, "PUT", err) + return true + } + } + + wg.Wait() + + return false +} + +func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object, signer *putSingleRequestSigner) error { + if nodeDesc.local { + return s.localStore.Put(ctx, obj) + } + + var info client.NodeInfo + + client.NodeInfoFromNetmapElement(&info, nodeDesc.info) + + c, err := s.clientConstructor.Get(info) + if err != nil { + return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) + } + + return s.redirectPutSingleRequest(ctx, signer, obj, info, c) +} + +func (s *Service) redirectPutSingleRequest(ctx context.Context, + signer *putSingleRequestSigner, + obj *objectSDK.Object, + info client.NodeInfo, + c client.MultiAddressClient) error { + ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest") + defer span.End() + + var req *objectAPI.PutSingleRequest + var firstErr error + req, firstErr = signer.GetRequestWithSignedHeader() + if firstErr != nil { + return firstErr + } + + info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) { + ctx, span := tracing.StartSpanFromContext(ctx, "putService.redirectPutSingleRequest.IterateAddresses", + trace.WithAttributes( + attribute.String("address", addr.String()), + )) + defer span.End() + + var err error + + defer func() { + if err != nil { + objID, _ := obj.ID() + cnrID, _ := obj.ContainerID() + s.log.Warn("failed to redirect PutSingle request", + zap.Error(err), + zap.Stringer("address", addr), + zap.Stringer("object_id", objID), + zap.Stringer("container_id", cnrID), + ) + } + + stop = err == nil + if stop || firstErr == nil { + firstErr = err + } + }() + + var resp *objectAPI.PutSingleResponse + + err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error { + var e error + resp, e = rpc.PutSingleObject(cli, req, rawclient.WithContext(ctx)) + return e + }) + if err != nil { + err = fmt.Errorf("failed to execute request: %w", err) + return + } + + if err = internal.VerifyResponseKeyV2(info.PublicKey(), resp); err != nil { + return + } + + err = signature.VerifyServiceMessage(resp) + if err != nil { + err = fmt.Errorf("response verification failed: %w", err) + } + + return + }) + + return firstErr +} diff --git a/pkg/services/object/put/v2/service.go b/pkg/services/object/put/v2/service.go index 5af62cd40..78655edc7 100644 --- a/pkg/services/object/put/v2/service.go +++ b/pkg/services/object/put/v2/service.go @@ -49,8 +49,8 @@ func (s *Service) Put() (object.PutObjectStream, error) { }, nil } -func (s *Service) PutSingle(context.Context, *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) { - return nil, fmt.Errorf("unimplemented") //TODO +func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) { + return s.svc.PutSingle(ctx, req) } func WithInternalService(v *putsvc.Service) Option {