From d54022eacc70271c1f4c90c5b423c6aa361aa8f0 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 30 Nov 2022 20:36:34 +0300 Subject: [PATCH] [#2047] node: Do not send chunk twice on request forwarding That could happen if a node forwards request to a node that closed the connection during the original object stream. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/services/object/get/v2/util.go | 54 +++++++++++++++++++++++++++--- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 992be304..61e332f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ Changelog for NeoFS Node - Making notary deposits with a zero GAS balance (#2080) - Notary requests on shutdown (#2075) - `neofs-cli container create ` check the sufficiency of the number of nodes in the selector for replicas (#2038) +- Data duplication during request forwarding (#2047) ### Removed - `-g` option from `neofs-cli control ...` and `neofs-cli container create` commands (#2089) diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 17ac8d63..eb953e61 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -66,6 +66,9 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre if !commonPrm.LocalOnly() { var onceResign sync.Once + var onceHeaderSending sync.Once + var globalProgress int + p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) { var err error @@ -106,8 +109,9 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre } var ( - headWas bool - resp = new(objectV2.GetResponse) + headWas bool + resp = new(objectV2.GetResponse) + localProgress int ) for { @@ -155,7 +159,10 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre obj.SetSignature(v.GetSignature()) obj.SetHeader(v.GetHeader()) - if err = streamWrapper.WriteHeader(object.NewFromV2(obj)); err != nil { + onceHeaderSending.Do(func() { + err = streamWrapper.WriteHeader(object.NewFromV2(obj)) + }) + if err != nil { return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err) } case *objectV2.GetObjectPartChunk: @@ -163,9 +170,20 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre return nil, errWrongMessageSeq } - if err = streamWrapper.WriteChunk(v.GetChunk()); err != nil { + origChunk := v.GetChunk() + + chunk := chunkToSend(globalProgress, localProgress, origChunk) + if len(chunk) == 0 { + localProgress += len(origChunk) + continue + } + + if err = streamWrapper.WriteChunk(chunk); err != nil { return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err) } + + localProgress += len(origChunk) + globalProgress += len(chunk) case *objectV2.SplitInfo: si := object.NewSplitInfoFromV2(v) return nil, object.NewSplitInfoError(si) @@ -213,6 +231,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get if !commonPrm.LocalOnly() { var onceResign sync.Once + var globalProgress int key, err := s.keyStorage.GetKey(nil) if err != nil { @@ -254,6 +273,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get } resp := new(objectV2.GetRangeResponse) + var localProgress int for { // receive message from server stream @@ -284,9 +304,20 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get case nil: return nil, fmt.Errorf("unexpected range type %T", v) case *objectV2.GetRangePartChunk: - if err = streamWrapper.WriteChunk(v.GetChunk()); err != nil { + origChunk := v.GetChunk() + + chunk := chunkToSend(globalProgress, localProgress, origChunk) + if len(chunk) == 0 { + localProgress += len(origChunk) + continue + } + + if err = streamWrapper.WriteChunk(chunk); err != nil { return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err) } + + localProgress += len(origChunk) + globalProgress += len(chunk) case *objectV2.SplitInfo: si := object.NewSplitInfoFromV2(v) @@ -646,3 +677,16 @@ func checkStatus(stV2 *status.Status) error { return nil } + +func chunkToSend(global, local int, chunk []byte) []byte { + if global == local { + return chunk + } + + if local+len(chunk) <= global { + // chunk has already been sent + return nil + } + + return chunk[global-local:] +}