From 2adbe29f7ff904087a4a9b743bfadc0e4b664508 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 24 Feb 2022 14:33:43 +0300 Subject: [PATCH] [#149] client: Fill buffer completely when reading stream data In previous implementation all `Read` methods read single response per-call, so buffer could be incompletely filled w/o an error. In order to follow `io.Reader` docs we need to continue filling the buffer while it is possible. Signed-off-by: Leonard Lyubich --- client/object_get.go | 133 +++++++++++++++++++++++----------------- client/object_search.go | 10 +-- 2 files changed, 83 insertions(+), 60 deletions(-) diff --git a/client/object_get.go b/client/object_get.go index 5f67c915..98a4fbf0 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -166,34 +166,47 @@ func (x *ObjectReader) readChunk(buf []byte) (int, bool) { return read, true } - // receive next message - ok := x.ctxCall.readResponse() - if !ok { - return read, false + var ok bool + var part v2object.GetObjectPart + var chunk []byte + var lastRead int + + for { + // receive next message + ok = x.ctxCall.readResponse() + if !ok { + return read, false + } + + // get chunk part message + part = x.bodyResp.GetObjectPart() + + var partChunk *v2object.GetObjectPartChunk + + partChunk, ok = part.(*v2object.GetObjectPartChunk) + if !ok { + x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part) + return read, false + } + + // read new chunk + chunk = partChunk.GetChunk() + if len(chunk) == 0 { + // just skip empty chunks since they are not prohibited by protocol + continue + } + + lastRead = copy(buf[read:], chunk) + + read += lastRead + + if read == len(buf) { + // save the tail + x.tailPayload = append(x.tailPayload, chunk[lastRead:]...) + + return read, true + } } - - // get chunk part message - part := x.bodyResp.GetObjectPart() - - var partChunk *v2object.GetObjectPartChunk - - partChunk, ok = part.(*v2object.GetObjectPartChunk) - if !ok { - x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part) - return read, false - } - - // read new chunk - chunk := partChunk.GetChunk() - - tailOffset := copy(buf[read:], chunk) - - read += tailOffset - - // save the tail - x.tailPayload = append(x.tailPayload, chunk[tailOffset:]...) - - return read, true } // ReadChunk reads another chunk of the object payload. Works similar to @@ -593,37 +606,47 @@ func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) { return read, true } - // receive next message - ok := x.ctxCall.readResponse() - if !ok { - return read, false - } - - // get chunk message + var ok bool var partChunk *v2object.GetRangePartChunk + var chunk []byte + var lastRead int - switch v := x.bodyResp.GetRangePart().(type) { - default: - x.ctxCall.err = fmt.Errorf("unexpected message received: %T", v) - return read, false - case *v2object.SplitInfo: - handleSplitInfo(&x.ctxCall, v) - return read, false - case *v2object.GetRangePartChunk: - partChunk = v + for { + // receive next message + ok = x.ctxCall.readResponse() + if !ok { + return read, false + } + + // get chunk message + switch v := x.bodyResp.GetRangePart().(type) { + default: + x.ctxCall.err = fmt.Errorf("unexpected message received: %T", v) + return read, false + case *v2object.SplitInfo: + handleSplitInfo(&x.ctxCall, v) + return read, false + case *v2object.GetRangePartChunk: + partChunk = v + } + + chunk = partChunk.GetChunk() + if len(chunk) == 0 { + // just skip empty chunks since they are not prohibited by protocol + continue + } + + lastRead = copy(buf[read:], chunk) + + read += lastRead + + if read == len(buf) { + // save the tail + x.tailPayload = append(x.tailPayload, chunk[lastRead:]...) + + return read, true + } } - - // read new chunk - chunk := partChunk.GetChunk() - - tailOffset := copy(buf[read:], chunk) - - read += tailOffset - - // save the tail - x.tailPayload = append(x.tailPayload, chunk[tailOffset:]...) - - return read, true } // ReadChunk reads another chunk of the object payload range. diff --git a/client/object_search.go b/client/object_search.go index fc7fcdc0..cd79597b 100644 --- a/client/object_search.go +++ b/client/object_search.go @@ -166,13 +166,13 @@ func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) { read += ln - // save the tail and break - x.tail = append(x.tail, ids[ln:]...) + if read == len(buf) { + // save the tail + x.tail = append(x.tail, ids[ln:]...) - break + return read, true + } } - - return read, true } // Close ends reading list of the matched objects and returns the result of the operation