forked from TrueCloudLab/frostfs-sdk-go
[#131] client: Return io.ErrUnexpectedEOF
on cut object payload
Make `ObjectReader` / `ObjectRangeReader` to track number of bytes read and return: * `io.ErrUnexpectedEOF` if the stream finished by the server before the last byte of the payload was read. * an error if more object payload size bytes was read. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
d90fe8fbab
commit
a0cbc3d8be
1 changed files with 28 additions and 4 deletions
|
@ -107,6 +107,8 @@ type ObjectReader struct {
|
||||||
bodyResp v2object.GetResponseBody
|
bodyResp v2object.GetResponseBody
|
||||||
|
|
||||||
tailPayload []byte
|
tailPayload []byte
|
||||||
|
|
||||||
|
remainingPayloadLen int
|
||||||
}
|
}
|
||||||
|
|
||||||
// UseKey specifies private key to sign the requests.
|
// UseKey specifies private key to sign the requests.
|
||||||
|
@ -147,6 +149,8 @@ func (x *ObjectReader) ReadHeader(dst *object.Object) bool {
|
||||||
objv2.SetHeader(partInit.GetHeader())
|
objv2.SetHeader(partInit.GetHeader())
|
||||||
objv2.SetSignature(partInit.GetSignature())
|
objv2.SetSignature(partInit.GetSignature())
|
||||||
|
|
||||||
|
x.remainingPayloadLen = int(objv2.GetHeader().GetPayloadLength())
|
||||||
|
|
||||||
*dst = *object.NewFromV2(&objv2) // need smth better
|
*dst = *object.NewFromV2(&objv2) // need smth better
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
@ -208,6 +212,8 @@ func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) {
|
||||||
if x.ctxCall.err != nil {
|
if x.ctxCall.err != nil {
|
||||||
if !errors.Is(x.ctxCall.err, io.EOF) {
|
if !errors.Is(x.ctxCall.err, io.EOF) {
|
||||||
return nil, x.ctxCall.err
|
return nil, x.ctxCall.err
|
||||||
|
} else if x.remainingPayloadLen > 0 {
|
||||||
|
return nil, io.ErrUnexpectedEOF
|
||||||
} else if !ignoreEOF {
|
} else if !ignoreEOF {
|
||||||
return nil, io.EOF
|
return nil, io.EOF
|
||||||
}
|
}
|
||||||
|
@ -239,11 +245,17 @@ func (x *ObjectReader) Read(p []byte) (int, error) {
|
||||||
res, err := x.close(false)
|
res, err := x.close(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, err
|
||||||
} else if !x.ctxCall.resolveAPIFailures {
|
|
||||||
return n, apistatus.ErrFromStatus(res.Status())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return n, apistatus.ErrFromStatus(res.Status())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if n > x.remainingPayloadLen {
|
||||||
|
return n, errors.New("payload size overflow")
|
||||||
|
}
|
||||||
|
|
||||||
|
x.remainingPayloadLen -= n
|
||||||
|
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -497,6 +509,8 @@ type ObjectRangeReader struct {
|
||||||
bodyResp v2object.GetRangeResponseBody
|
bodyResp v2object.GetRangeResponseBody
|
||||||
|
|
||||||
tailPayload []byte
|
tailPayload []byte
|
||||||
|
|
||||||
|
remainingPayloadLen int
|
||||||
}
|
}
|
||||||
|
|
||||||
// UseKey specifies private key to sign the requests.
|
// UseKey specifies private key to sign the requests.
|
||||||
|
@ -572,6 +586,8 @@ func (x *ObjectRangeReader) close(ignoreEOF bool) (*ResObjectRange, error) {
|
||||||
if x.ctxCall.err != nil {
|
if x.ctxCall.err != nil {
|
||||||
if !errors.Is(x.ctxCall.err, io.EOF) {
|
if !errors.Is(x.ctxCall.err, io.EOF) {
|
||||||
return nil, x.ctxCall.err
|
return nil, x.ctxCall.err
|
||||||
|
} else if x.remainingPayloadLen > 0 {
|
||||||
|
return nil, io.ErrUnexpectedEOF
|
||||||
} else if !ignoreEOF {
|
} else if !ignoreEOF {
|
||||||
return nil, io.EOF
|
return nil, io.EOF
|
||||||
}
|
}
|
||||||
|
@ -603,11 +619,17 @@ func (x *ObjectRangeReader) Read(p []byte) (int, error) {
|
||||||
res, err := x.close(false)
|
res, err := x.close(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, err
|
||||||
} else if !x.ctxCall.resolveAPIFailures {
|
|
||||||
return n, apistatus.ErrFromStatus(res.Status())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return n, apistatus.ErrFromStatus(res.Status())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if n > x.remainingPayloadLen {
|
||||||
|
return n, errors.New("payload range size overflow")
|
||||||
|
}
|
||||||
|
|
||||||
|
x.remainingPayloadLen -= n
|
||||||
|
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -677,6 +699,8 @@ func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*Obje
|
||||||
stream *rpcapi.ObjectRangeResponseReader
|
stream *rpcapi.ObjectRangeResponseReader
|
||||||
)
|
)
|
||||||
|
|
||||||
|
r.remainingPayloadLen = int(prm.ln)
|
||||||
|
|
||||||
ctx, r.cancelCtxStream = context.WithCancel(ctx)
|
ctx, r.cancelCtxStream = context.WithCancel(ctx)
|
||||||
|
|
||||||
resp.SetBody(&r.bodyResp)
|
resp.SetBody(&r.bodyResp)
|
||||||
|
|
Loading…
Reference in a new issue