[#2047] node: Do not send chunk twice on request forwarding

That could happen if a node forwards request to a node that closed the
connection during the original object stream.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-11-30 20:36:34 +03:00 committed by fyrchik
parent bd25db5d4a
commit d54022eacc
2 changed files with 50 additions and 5 deletions

View file

@ -48,6 +48,7 @@ Changelog for NeoFS Node
- Making notary deposits with a zero GAS balance (#2080)
- Notary requests on shutdown (#2075)
- `neofs-cli container create ` check the sufficiency of the number of nodes in the selector for replicas (#2038)
- Data duplication during request forwarding (#2047)
### Removed
- `-g` option from `neofs-cli control ...` and `neofs-cli container create` commands (#2089)

View file

@ -66,6 +66,9 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
if !commonPrm.LocalOnly() {
var onceResign sync.Once
var onceHeaderSending sync.Once
var globalProgress int
p.SetRequestForwarder(groupAddressRequestForwarder(func(addr network.Address, c client.MultiAddressClient, pubkey []byte) (*object.Object, error) {
var err error
@ -108,6 +111,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
var (
headWas bool
resp = new(objectV2.GetResponse)
localProgress int
)
for {
@ -155,7 +159,10 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
obj.SetSignature(v.GetSignature())
obj.SetHeader(v.GetHeader())
if err = streamWrapper.WriteHeader(object.NewFromV2(obj)); err != nil {
onceHeaderSending.Do(func() {
err = streamWrapper.WriteHeader(object.NewFromV2(obj))
})
if err != nil {
return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
case *objectV2.GetObjectPartChunk:
@ -163,9 +170,20 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
return nil, errWrongMessageSeq
}
if err = streamWrapper.WriteChunk(v.GetChunk()); err != nil {
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
return nil, object.NewSplitInfoError(si)
@ -213,6 +231,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
if !commonPrm.LocalOnly() {
var onceResign sync.Once
var globalProgress int
key, err := s.keyStorage.GetKey(nil)
if err != nil {
@ -254,6 +273,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
}
resp := new(objectV2.GetRangeResponse)
var localProgress int
for {
// receive message from server stream
@ -284,9 +304,20 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
case nil:
return nil, fmt.Errorf("unexpected range type %T", v)
case *objectV2.GetRangePartChunk:
if err = streamWrapper.WriteChunk(v.GetChunk()); err != nil {
origChunk := v.GetChunk()
chunk := chunkToSend(globalProgress, localProgress, origChunk)
if len(chunk) == 0 {
localProgress += len(origChunk)
continue
}
if err = streamWrapper.WriteChunk(chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
}
localProgress += len(origChunk)
globalProgress += len(chunk)
case *objectV2.SplitInfo:
si := object.NewSplitInfoFromV2(v)
@ -646,3 +677,16 @@ func checkStatus(stV2 *status.Status) error {
return nil
}
func chunkToSend(global, local int, chunk []byte) []byte {
if global == local {
return chunk
}
if local+len(chunk) <= global {
// chunk has already been sent
return nil
}
return chunk[global-local:]
}