Simplify receiving object from NeoFS
- add function to receive object with payload - simplify receiving object for layer.Get - simplify receiving object for layer.objectGet Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
This commit is contained in:
parent
06c9a126d1
commit
06df8642b3
2 changed files with 62 additions and 85 deletions
|
@ -124,42 +124,7 @@ func (n *layer) Get(ctx context.Context, address refs.Address) (*object.Object,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
off int
|
||||
obj *object.Object
|
||||
)
|
||||
|
||||
for {
|
||||
resp, err := cli.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch o := resp.R.(type) {
|
||||
case *object.GetResponse_Object:
|
||||
obj = o.Object
|
||||
|
||||
_, hdr := obj.LastHeader(object.HeaderType(object.TombstoneHdr))
|
||||
if hdr != nil {
|
||||
return nil, errors.New("object already removed")
|
||||
}
|
||||
|
||||
obj.Payload = make([]byte, obj.SystemHeader.PayloadLength)
|
||||
case *object.GetResponse_Chunk:
|
||||
if obj == nil {
|
||||
return nil, errors.New("object headers not received")
|
||||
}
|
||||
off += copy(obj.Payload[off:], o.Chunk)
|
||||
default:
|
||||
return nil, errors.Errorf("unknown response %T", o)
|
||||
}
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
return receiveObject(cli)
|
||||
}
|
||||
|
||||
// GetBucketInfo returns bucket name.
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
@ -260,6 +259,54 @@ func (n *layer) objectHead(ctx context.Context, addr refs.Address) (*object.Obje
|
|||
return res.Object, nil
|
||||
}
|
||||
|
||||
func receiveObject(cli object.Service_GetClient) (*object.Object, error) {
|
||||
var (
|
||||
off int
|
||||
buf []byte
|
||||
obj *object.Object
|
||||
)
|
||||
|
||||
for {
|
||||
resp, err := cli.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch o := resp.R.(type) {
|
||||
case *object.GetResponse_Object:
|
||||
|
||||
if _, hdr := o.Object.LastHeader(object.HeaderType(object.TombstoneHdr)); hdr != nil {
|
||||
return nil, errors.New("object already removed")
|
||||
}
|
||||
|
||||
obj = o.Object
|
||||
buf = make([]byte, obj.SystemHeader.PayloadLength)
|
||||
|
||||
if len(obj.Payload) > 0 {
|
||||
off += copy(buf, obj.Payload)
|
||||
}
|
||||
case *object.GetResponse_Chunk:
|
||||
if obj == nil {
|
||||
return nil, errors.New("object headers not received")
|
||||
}
|
||||
off += copy(buf[off:], o.Chunk)
|
||||
default:
|
||||
return nil, errors.Errorf("unknown response %T", o)
|
||||
}
|
||||
}
|
||||
|
||||
if obj == nil {
|
||||
return nil, errors.New("object headers not received")
|
||||
}
|
||||
obj.Payload = buf
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// objectGet and write it into provided io.Reader.
|
||||
func (n *layer) objectGet(ctx context.Context, p getParams) (*object.Object, error) {
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
|
@ -294,58 +341,23 @@ func (n *layer) objectGet(ctx context.Context, p getParams) (*object.Object, err
|
|||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
getClient, err := object.NewServiceClient(conn).Get(ctx, req)
|
||||
if err != nil {
|
||||
var obj *object.Object
|
||||
|
||||
if cli, err := object.NewServiceClient(conn).Get(ctx, req); err != nil {
|
||||
return nil, err
|
||||
} else if obj, err = receiveObject(cli); err != nil {
|
||||
return nil, err
|
||||
} else if ln := int64(obj.SystemHeader.PayloadLength); p.start+p.length > ln {
|
||||
return nil, errors.Errorf("slice bounds out of range: len = %d, start = %d, offset = %d",
|
||||
ln, p.start, p.length)
|
||||
} else if _, err = p.writer.Write(obj.Payload[p.start : p.start+p.length]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
headerReceived bool
|
||||
// remove payload:
|
||||
obj.Payload = nil
|
||||
|
||||
buf = new(bytes.Buffer)
|
||||
objHeader = new(object.Object)
|
||||
)
|
||||
|
||||
for {
|
||||
resp, err := getClient.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !headerReceived {
|
||||
objHeader = resp.GetObject()
|
||||
|
||||
_, hdr := objHeader.LastHeader(object.HeaderType(object.TombstoneHdr))
|
||||
if hdr != nil {
|
||||
return nil, errors.New("object already removed")
|
||||
}
|
||||
|
||||
_, err = buf.Write(objHeader.Payload)
|
||||
if err != nil && err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
headerReceived = true
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
chunk := resp.GetChunk()
|
||||
|
||||
_, err = buf.Write(chunk)
|
||||
if err != nil && err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
buf = bytes.NewBuffer(buf.Bytes()[p.start : p.start+p.length])
|
||||
_, err = io.Copy(p.writer, buf)
|
||||
|
||||
return objHeader, err
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// objectPut into neofs, took payload from io.Reader.
|
||||
|
|
Loading…
Reference in a new issue