From 06df8642b3c106186ce950352199edea782a30ab Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Wed, 5 Aug 2020 16:47:09 +0300 Subject: [PATCH] Simplify receiving object from NeoFS - add function to receive object with payload - simplify receiving object for layer.Get - simplify receiving object for layer.objectGet Signed-off-by: Evgeniy Kulikov --- neofs/layer/layer.go | 37 +------------- neofs/layer/object.go | 110 +++++++++++++++++++++++------------------- 2 files changed, 62 insertions(+), 85 deletions(-) diff --git a/neofs/layer/layer.go b/neofs/layer/layer.go index e02b5bd70..80c9c4683 100644 --- a/neofs/layer/layer.go +++ b/neofs/layer/layer.go @@ -124,42 +124,7 @@ func (n *layer) Get(ctx context.Context, address refs.Address) (*object.Object, return nil, err } - var ( - off int - obj *object.Object - ) - - for { - resp, err := cli.Recv() - if err != nil { - if err == io.EOF { - break - } - - return nil, err - } - - switch o := resp.R.(type) { - case *object.GetResponse_Object: - obj = o.Object - - _, hdr := obj.LastHeader(object.HeaderType(object.TombstoneHdr)) - if hdr != nil { - return nil, errors.New("object already removed") - } - - obj.Payload = make([]byte, obj.SystemHeader.PayloadLength) - case *object.GetResponse_Chunk: - if obj == nil { - return nil, errors.New("object headers not received") - } - off += copy(obj.Payload[off:], o.Chunk) - default: - return nil, errors.Errorf("unknown response %T", o) - } - } - - return obj, nil + return receiveObject(cli) } // GetBucketInfo returns bucket name. diff --git a/neofs/layer/object.go b/neofs/layer/object.go index f7d599588..bbc9434a1 100644 --- a/neofs/layer/object.go +++ b/neofs/layer/object.go @@ -1,7 +1,6 @@ package layer import ( - "bytes" "context" "io" "time" @@ -260,6 +259,54 @@ func (n *layer) objectHead(ctx context.Context, addr refs.Address) (*object.Obje return res.Object, nil } +func receiveObject(cli object.Service_GetClient) (*object.Object, error) { + var ( + off int + buf []byte + obj *object.Object + ) + + for { + resp, err := cli.Recv() + if err != nil { + if err == io.EOF { + break + } + + return nil, err + } + + switch o := resp.R.(type) { + case *object.GetResponse_Object: + + if _, hdr := o.Object.LastHeader(object.HeaderType(object.TombstoneHdr)); hdr != nil { + return nil, errors.New("object already removed") + } + + obj = o.Object + buf = make([]byte, obj.SystemHeader.PayloadLength) + + if len(obj.Payload) > 0 { + off += copy(buf, obj.Payload) + } + case *object.GetResponse_Chunk: + if obj == nil { + return nil, errors.New("object headers not received") + } + off += copy(buf[off:], o.Chunk) + default: + return nil, errors.Errorf("unknown response %T", o) + } + } + + if obj == nil { + return nil, errors.New("object headers not received") + } + obj.Payload = buf + + return obj, nil +} + // objectGet and write it into provided io.Reader. func (n *layer) objectGet(ctx context.Context, p getParams) (*object.Object, error) { conn, err := n.cli.GetConnection(ctx) @@ -294,58 +341,23 @@ func (n *layer) objectGet(ctx context.Context, p getParams) (*object.Object, err ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - getClient, err := object.NewServiceClient(conn).Get(ctx, req) - if err != nil { + var obj *object.Object + + if cli, err := object.NewServiceClient(conn).Get(ctx, req); err != nil { + return nil, err + } else if obj, err = receiveObject(cli); err != nil { + return nil, err + } else if ln := int64(obj.SystemHeader.PayloadLength); p.start+p.length > ln { + return nil, errors.Errorf("slice bounds out of range: len = %d, start = %d, offset = %d", + ln, p.start, p.length) + } else if _, err = p.writer.Write(obj.Payload[p.start : p.start+p.length]); err != nil { return nil, err } - var ( - headerReceived bool + // remove payload: + obj.Payload = nil - buf = new(bytes.Buffer) - objHeader = new(object.Object) - ) - - for { - resp, err := getClient.Recv() - if err != nil { - if err == io.EOF { - break - } - - return nil, err - } - - if !headerReceived { - objHeader = resp.GetObject() - - _, hdr := objHeader.LastHeader(object.HeaderType(object.TombstoneHdr)) - if hdr != nil { - return nil, errors.New("object already removed") - } - - _, err = buf.Write(objHeader.Payload) - if err != nil && err != io.EOF { - return nil, err - } - - headerReceived = true - - continue - } - - chunk := resp.GetChunk() - - _, err = buf.Write(chunk) - if err != nil && err != io.EOF { - return nil, err - } - } - - buf = bytes.NewBuffer(buf.Bytes()[p.start : p.start+p.length]) - _, err = io.Copy(p.writer, buf) - - return objHeader, err + return obj, nil } // objectPut into neofs, took payload from io.Reader.