forked from TrueCloudLab/frostfs-sdk-go
[#149] client: Fill buffer completely when reading stream data
In previous implementation all `Read` methods read single response per-call, so buffer could be incompletely filled w/o an error. In order to follow `io.Reader` docs we need to continue filling the buffer while it is possible. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
8e99e8a818
commit
2adbe29f7f
2 changed files with 83 additions and 60 deletions
|
@ -166,34 +166,47 @@ func (x *ObjectReader) readChunk(buf []byte) (int, bool) {
|
||||||
return read, true
|
return read, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// receive next message
|
var ok bool
|
||||||
ok := x.ctxCall.readResponse()
|
var part v2object.GetObjectPart
|
||||||
if !ok {
|
var chunk []byte
|
||||||
return read, false
|
var lastRead int
|
||||||
|
|
||||||
|
for {
|
||||||
|
// receive next message
|
||||||
|
ok = x.ctxCall.readResponse()
|
||||||
|
if !ok {
|
||||||
|
return read, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// get chunk part message
|
||||||
|
part = x.bodyResp.GetObjectPart()
|
||||||
|
|
||||||
|
var partChunk *v2object.GetObjectPartChunk
|
||||||
|
|
||||||
|
partChunk, ok = part.(*v2object.GetObjectPartChunk)
|
||||||
|
if !ok {
|
||||||
|
x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part)
|
||||||
|
return read, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// read new chunk
|
||||||
|
chunk = partChunk.GetChunk()
|
||||||
|
if len(chunk) == 0 {
|
||||||
|
// just skip empty chunks since they are not prohibited by protocol
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
lastRead = copy(buf[read:], chunk)
|
||||||
|
|
||||||
|
read += lastRead
|
||||||
|
|
||||||
|
if read == len(buf) {
|
||||||
|
// save the tail
|
||||||
|
x.tailPayload = append(x.tailPayload, chunk[lastRead:]...)
|
||||||
|
|
||||||
|
return read, true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// get chunk part message
|
|
||||||
part := x.bodyResp.GetObjectPart()
|
|
||||||
|
|
||||||
var partChunk *v2object.GetObjectPartChunk
|
|
||||||
|
|
||||||
partChunk, ok = part.(*v2object.GetObjectPartChunk)
|
|
||||||
if !ok {
|
|
||||||
x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part)
|
|
||||||
return read, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// read new chunk
|
|
||||||
chunk := partChunk.GetChunk()
|
|
||||||
|
|
||||||
tailOffset := copy(buf[read:], chunk)
|
|
||||||
|
|
||||||
read += tailOffset
|
|
||||||
|
|
||||||
// save the tail
|
|
||||||
x.tailPayload = append(x.tailPayload, chunk[tailOffset:]...)
|
|
||||||
|
|
||||||
return read, true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadChunk reads another chunk of the object payload. Works similar to
|
// ReadChunk reads another chunk of the object payload. Works similar to
|
||||||
|
@ -593,37 +606,47 @@ func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) {
|
||||||
return read, true
|
return read, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// receive next message
|
var ok bool
|
||||||
ok := x.ctxCall.readResponse()
|
|
||||||
if !ok {
|
|
||||||
return read, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// get chunk message
|
|
||||||
var partChunk *v2object.GetRangePartChunk
|
var partChunk *v2object.GetRangePartChunk
|
||||||
|
var chunk []byte
|
||||||
|
var lastRead int
|
||||||
|
|
||||||
switch v := x.bodyResp.GetRangePart().(type) {
|
for {
|
||||||
default:
|
// receive next message
|
||||||
x.ctxCall.err = fmt.Errorf("unexpected message received: %T", v)
|
ok = x.ctxCall.readResponse()
|
||||||
return read, false
|
if !ok {
|
||||||
case *v2object.SplitInfo:
|
return read, false
|
||||||
handleSplitInfo(&x.ctxCall, v)
|
}
|
||||||
return read, false
|
|
||||||
case *v2object.GetRangePartChunk:
|
// get chunk message
|
||||||
partChunk = v
|
switch v := x.bodyResp.GetRangePart().(type) {
|
||||||
|
default:
|
||||||
|
x.ctxCall.err = fmt.Errorf("unexpected message received: %T", v)
|
||||||
|
return read, false
|
||||||
|
case *v2object.SplitInfo:
|
||||||
|
handleSplitInfo(&x.ctxCall, v)
|
||||||
|
return read, false
|
||||||
|
case *v2object.GetRangePartChunk:
|
||||||
|
partChunk = v
|
||||||
|
}
|
||||||
|
|
||||||
|
chunk = partChunk.GetChunk()
|
||||||
|
if len(chunk) == 0 {
|
||||||
|
// just skip empty chunks since they are not prohibited by protocol
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
lastRead = copy(buf[read:], chunk)
|
||||||
|
|
||||||
|
read += lastRead
|
||||||
|
|
||||||
|
if read == len(buf) {
|
||||||
|
// save the tail
|
||||||
|
x.tailPayload = append(x.tailPayload, chunk[lastRead:]...)
|
||||||
|
|
||||||
|
return read, true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// read new chunk
|
|
||||||
chunk := partChunk.GetChunk()
|
|
||||||
|
|
||||||
tailOffset := copy(buf[read:], chunk)
|
|
||||||
|
|
||||||
read += tailOffset
|
|
||||||
|
|
||||||
// save the tail
|
|
||||||
x.tailPayload = append(x.tailPayload, chunk[tailOffset:]...)
|
|
||||||
|
|
||||||
return read, true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadChunk reads another chunk of the object payload range.
|
// ReadChunk reads another chunk of the object payload range.
|
||||||
|
|
|
@ -166,13 +166,13 @@ func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) {
|
||||||
|
|
||||||
read += ln
|
read += ln
|
||||||
|
|
||||||
// save the tail and break
|
if read == len(buf) {
|
||||||
x.tail = append(x.tail, ids[ln:]...)
|
// save the tail
|
||||||
|
x.tail = append(x.tail, ids[ln:]...)
|
||||||
|
|
||||||
break
|
return read, true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return read, true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close ends reading list of the matched objects and returns the result of the operation
|
// Close ends reading list of the matched objects and returns the result of the operation
|
||||||
|
|
Loading…
Reference in a new issue