Merge pull request #9 from nspcc-dev/fixes-for-object-receive

Simplify receiving object from NeoFS
remotes/KirillovDenis/bugfix/681-fix_acl_parsing
Evgeniy Kulikov 2020-08-05 16:52:43 +03:00 committed by GitHub
commit f055e7d269
2 changed files with 62 additions and 85 deletions

View File

@ -124,42 +124,7 @@ func (n *layer) Get(ctx context.Context, address refs.Address) (*object.Object,
return nil, err
}
var (
off int
obj *object.Object
)
for {
resp, err := cli.Recv()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
switch o := resp.R.(type) {
case *object.GetResponse_Object:
obj = o.Object
_, hdr := obj.LastHeader(object.HeaderType(object.TombstoneHdr))
if hdr != nil {
return nil, errors.New("object already removed")
}
obj.Payload = make([]byte, obj.SystemHeader.PayloadLength)
case *object.GetResponse_Chunk:
if obj == nil {
return nil, errors.New("object headers not received")
}
off += copy(obj.Payload[off:], o.Chunk)
default:
return nil, errors.Errorf("unknown response %T", o)
}
}
return obj, nil
return receiveObject(cli)
}
// GetBucketInfo returns bucket name.

View File

@ -1,7 +1,6 @@
package layer
import (
"bytes"
"context"
"io"
"time"
@ -260,6 +259,54 @@ func (n *layer) objectHead(ctx context.Context, addr refs.Address) (*object.Obje
return res.Object, nil
}
func receiveObject(cli object.Service_GetClient) (*object.Object, error) {
var (
off int
buf []byte
obj *object.Object
)
for {
resp, err := cli.Recv()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
switch o := resp.R.(type) {
case *object.GetResponse_Object:
if _, hdr := o.Object.LastHeader(object.HeaderType(object.TombstoneHdr)); hdr != nil {
return nil, errors.New("object already removed")
}
obj = o.Object
buf = make([]byte, obj.SystemHeader.PayloadLength)
if len(obj.Payload) > 0 {
off += copy(buf, obj.Payload)
}
case *object.GetResponse_Chunk:
if obj == nil {
return nil, errors.New("object headers not received")
}
off += copy(buf[off:], o.Chunk)
default:
return nil, errors.Errorf("unknown response %T", o)
}
}
if obj == nil {
return nil, errors.New("object headers not received")
}
obj.Payload = buf
return obj, nil
}
// objectGet and write it into provided io.Reader.
func (n *layer) objectGet(ctx context.Context, p getParams) (*object.Object, error) {
conn, err := n.cli.GetConnection(ctx)
@ -294,58 +341,23 @@ func (n *layer) objectGet(ctx context.Context, p getParams) (*object.Object, err
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
getClient, err := object.NewServiceClient(conn).Get(ctx, req)
if err != nil {
var obj *object.Object
if cli, err := object.NewServiceClient(conn).Get(ctx, req); err != nil {
return nil, err
} else if obj, err = receiveObject(cli); err != nil {
return nil, err
} else if ln := int64(obj.SystemHeader.PayloadLength); p.start+p.length > ln {
return nil, errors.Errorf("slice bounds out of range: len = %d, start = %d, offset = %d",
ln, p.start, p.length)
} else if _, err = p.writer.Write(obj.Payload[p.start : p.start+p.length]); err != nil {
return nil, err
}
var (
headerReceived bool
// remove payload:
obj.Payload = nil
buf = new(bytes.Buffer)
objHeader = new(object.Object)
)
for {
resp, err := getClient.Recv()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
if !headerReceived {
objHeader = resp.GetObject()
_, hdr := objHeader.LastHeader(object.HeaderType(object.TombstoneHdr))
if hdr != nil {
return nil, errors.New("object already removed")
}
_, err = buf.Write(objHeader.Payload)
if err != nil && err != io.EOF {
return nil, err
}
headerReceived = true
continue
}
chunk := resp.GetChunk()
_, err = buf.Write(chunk)
if err != nil && err != io.EOF {
return nil, err
}
}
buf = bytes.NewBuffer(buf.Bytes()[p.start : p.start+p.length])
_, err = io.Copy(p.writer, buf)
return objHeader, err
return obj, nil
}
// objectPut into neofs, took payload from io.Reader.