[#63] poc: Fast multipart upload #157

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:feature/63-fast_multipart_upload into master 2023-07-26 21:08:01 +00:00
3 changed files with 227 additions and 32 deletions

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -14,54 +15,124 @@ type partObj struct {
Size uint64
}
type readerInitiator interface {
initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error)
}
// implements io.Reader of payloads of the object list stored in the FrostFS network.
type multiObjectReader struct {

Some comments will be appreciated. I guess it is the offset of complete object and total size of complete object.

Some comments will be appreciated. I guess it is the offset of _complete_ object and total size of _complete_ object.
ctx context.Context
layer *layer
layer readerInitiator
off, ln uint64
startPartOffset uint64
endPartLength uint64
prm getFrostFSParams
curIndex int
curReader io.Reader
parts []partObj
}
type multiObjectReaderConfig struct {
layer readerInitiator
// the offset of complete object and total size to read
off, ln uint64
bktInfo *data.BucketInfo
parts []partObj
}

What if len(x.parts) == 1 and after x.parts[1:] the slice is empty,and panic happens on x.parts[0].Size. Is it valid?

Also, is it possible to go negative in x.off here?

What if `len(x.parts) == 1` and after `x.parts[1:]` the slice is empty,and panic happens on `x.parts[0].Size`. Is it valid? Also, is it possible to go negative in `x.off` here?

Is it valid?

In current implementation we have check range in handler, and init reading from frostfs. So we cannot get such invalid value here.
But yes, theoretically panic can happen. I'll add test for that

Also, is it possible to go negative in x.off here?

No, we have check if x.parts[0].Size < x.off above

> Is it valid? In current implementation we have check range in handler, and init reading from frostfs. So we cannot get such invalid value here. But yes, theoretically panic can happen. I'll add test for that > Also, is it possible to go negative in x.off here? No, we have check `if x.parts[0].Size < x.off` above
var (
errOffsetIsOutOfRange = errors.New("offset is out of payload range")
errLengthIsOutOfRange = errors.New("length is out of payload range")
errEmptyPartsList = errors.New("empty parts list")
errorZeroRangeLength = errors.New("zero range length")
)
func newMultiObjectReader(ctx context.Context, cfg multiObjectReaderConfig) (*multiObjectReader, error) {
if len(cfg.parts) == 0 {
return nil, errEmptyPartsList
}
r := &multiObjectReader{
ctx: ctx,
layer: cfg.layer,
prm: getFrostFSParams{
bktInfo: cfg.bktInfo,
},
parts: cfg.parts,
}
if cfg.off+cfg.ln == 0 {
return r, nil
}
if cfg.off > 0 && cfg.ln == 0 {
return nil, errorZeroRangeLength
}

Should it be recursive and not x.curReader.Read? What is the max recursion deepness here? As far as I see the max deepness is one, because x.curReader is set above and we expect early return from the recursive function.

Should it be recursive and not `x.curReader.Read`? What is the max recursion deepness here? As far as I see the max deepness is one, because `x.curReader` is set above and we expect early return from the recursive function.

Should it be recursive and not x.curReader.Read?

It seems it should be. We have to handle the case when we ended read one part and must start another.

What is the max recursion deepness here?

The max deepness is number of parts (if p is large enough to contain all parts payload at once)

> Should it be recursive and not `x.curReader.Read`? It seems it should be. We have to handle the case when we ended read one part and must start another. > What is the max recursion deepness here? The max deepness is number of parts (if `p` is large enough to contain all parts payload at once)
startPartIndex, startPartOffset := findStartPart(cfg)
if startPartIndex == -1 {
return nil, errOffsetIsOutOfRange
}
r.startPartOffset = startPartOffset
endPartIndex, endPartLength := findEndPart(cfg)
if endPartIndex == -1 {
return nil, errLengthIsOutOfRange
}
r.endPartLength = endPartLength
r.parts = cfg.parts[startPartIndex : endPartIndex+1]
return r, nil
}
func findStartPart(cfg multiObjectReaderConfig) (index int, offset uint64) {
return findPartByPosition(cfg.off, cfg.parts)
}
func findEndPart(cfg multiObjectReaderConfig) (index int, length uint64) {
return findPartByPosition(cfg.off+cfg.ln, cfg.parts)
}
func findPartByPosition(position uint64, parts []partObj) (index int, positionInPart uint64) {
for i, part := range parts {
if position <= part.Size {
return i, position
}
position -= part.Size
}
return -1, 0
}
func (x *multiObjectReader) Read(p []byte) (n int, err error) {
if x.curReader != nil {
n, err = x.curReader.Read(p)
if !errors.Is(err, io.EOF) {
return n, err
}
x.curIndex++
}
if len(x.parts) == 0 {
if x.curIndex == len(x.parts) {
return n, io.EOF
}
for x.off != 0 {
if x.parts[0].Size < x.off {
x.parts = x.parts[1:]
x.off -= x.parts[0].Size
} else {
x.prm.off = x.off
x.off = 0
}
x.prm.oid = x.parts[x.curIndex].OID
if x.curIndex == 0 {
x.prm.off = x.startPartOffset
x.prm.ln = x.parts[x.curIndex].Size - x.startPartOffset
}
x.prm.oid = x.parts[0].OID
if x.ln != 0 {
if x.parts[0].Size < x.prm.off+x.ln {
x.prm.ln = x.parts[0].Size - x.prm.off
x.ln -= x.prm.ln
} else {
x.prm.ln = x.ln
x.ln = 0
x.parts = x.parts[:1]
}
if x.curIndex == len(x.parts)-1 {
x.prm.ln = x.endPartLength - x.prm.off
}
x.curReader, err = x.layer.initFrostFSObjectPayloadReader(x.ctx, x.prm)
@ -72,8 +143,6 @@ func (x *multiObjectReader) Read(p []byte) (n int, err error) {
x.prm.off = 0
x.prm.ln = 0
x.parts = x.parts[1:]
next, err := x.Read(p[n:])
return n + next, err

View file

@ -0,0 +1,127 @@
package layer
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"testing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
type readerInitiatorMock struct {
parts map[oid.ID][]byte
}
func (r *readerInitiatorMock) initFrostFSObjectPayloadReader(_ context.Context, p getFrostFSParams) (io.Reader, error) {
partPayload, ok := r.parts[p.oid]
if !ok {
return nil, errors.New("part not found")
}
if p.off+p.ln == 0 {
return bytes.NewReader(partPayload), nil
}
if p.off > uint64(len(partPayload)-1) {
return nil, fmt.Errorf("invalid offset: %d/%d", p.off, len(partPayload))
}
if p.off+p.ln > uint64(len(partPayload)) {
return nil, fmt.Errorf("invalid range: %d-%d/%d", p.off, p.off+p.ln, len(partPayload))
}
return bytes.NewReader(partPayload[p.off : p.off+p.ln]), nil
}
func prepareDataReader() ([]byte, []partObj, *readerInitiatorMock) {
mockInitReader := &readerInitiatorMock{
parts: map[oid.ID][]byte{
oidtest.ID(): []byte("first part 1"),
oidtest.ID(): []byte("second part 2"),
oidtest.ID(): []byte("third part 3"),
},
}
var fullPayload []byte
parts := make([]partObj, 0, len(mockInitReader.parts))
for id, payload := range mockInitReader.parts {
parts = append(parts, partObj{OID: id, Size: uint64(len(payload))})
fullPayload = append(fullPayload, payload...)
}
return fullPayload, parts, mockInitReader
}
func TestMultiReader(t *testing.T) {
ctx := context.Background()
fullPayload, parts, mockInitReader := prepareDataReader()
for _, tc := range []struct {
name string
off uint64
ln uint64
err error
}{
{
name: "simple read all",
},
{
name: "simple read with length",
ln: uint64(len(fullPayload)),
},
{
name: "middle of parts",
off: parts[0].Size + 2,
ln: 4,
},
{
name: "first and second",
off: parts[0].Size - 4,
ln: 8,
},
{
name: "first and third",
off: parts[0].Size - 4,
ln: parts[1].Size + 8,
},
{
name: "offset out of range",
off: uint64(len(fullPayload) + 1),
ln: 1,
err: errOffsetIsOutOfRange,
},
{
name: "zero length",
off: parts[1].Size + 1,
ln: 0,
err: errorZeroRangeLength,
},
} {
t.Run(tc.name, func(t *testing.T) {
multiReader, err := newMultiObjectReader(ctx, multiObjectReaderConfig{
layer: mockInitReader,
parts: parts,
off: tc.off,
ln: tc.ln,
})
require.ErrorIs(t, err, tc.err)
if tc.err == nil {
off := tc.off
ln := tc.ln
if off+ln == 0 {
ln = uint64(len(fullPayload))
}
data, err := io.ReadAll(multiReader)
require.NoError(t, err)
require.Equal(t, fullPayload[off:off+ln], data)
}
})
}
}

View file

@ -143,14 +143,13 @@ func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
}
}
return &multiObjectReader{
ctx: ctx,
off: p.off,
ln: p.ln,
layer: n,
parts: objParts,
prm: getFrostFSParams{bktInfo: p.bktInfo},
}, nil
return newMultiObjectReader(ctx, multiObjectReaderConfig{
layer: n,
off: p.off,
ln: p.ln,
parts: objParts,
bktInfo: p.bktInfo,
})
}
// initializes payload reader of the FrostFS object.