[#63] poc: Fast multipart upload #157
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
|
@ -14,54 +15,124 @@ type partObj struct {
|
|||
Size uint64
|
||||
}
|
||||
|
||||
type readerInitiator interface {
|
||||
initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error)
|
||||
}
|
||||
|
||||
// implements io.Reader of payloads of the object list stored in the FrostFS network.
|
||||
type multiObjectReader struct {
|
||||
|
||||
ctx context.Context
|
||||
|
||||
layer *layer
|
||||
layer readerInitiator
|
||||
|
||||
off, ln uint64
|
||||
startPartOffset uint64
|
||||
endPartLength uint64
|
||||
|
||||
prm getFrostFSParams
|
||||
|
||||
curIndex int
|
||||
curReader io.Reader
|
||||
|
||||
parts []partObj
|
||||
}
|
||||
|
||||
type multiObjectReaderConfig struct {
|
||||
layer readerInitiator
|
||||
|
||||
// the offset of complete object and total size to read
|
||||
off, ln uint64
|
||||
|
||||
bktInfo *data.BucketInfo
|
||||
parts []partObj
|
||||
}
|
||||
alexvanin
commented
What if Also, is it possible to go negative in What if `len(x.parts) == 1` and after `x.parts[1:]` the slice is empty,and panic happens on `x.parts[0].Size`. Is it valid?
Also, is it possible to go negative in `x.off` here?
dkirillov
commented
In current implementation we have check range in handler, and init reading from frostfs. So we cannot get such invalid value here.
No, we have check > Is it valid?
In current implementation we have check range in handler, and init reading from frostfs. So we cannot get such invalid value here.
But yes, theoretically panic can happen. I'll add test for that
> Also, is it possible to go negative in x.off here?
No, we have check `if x.parts[0].Size < x.off` above
|
||||
|
||||
var (
|
||||
errOffsetIsOutOfRange = errors.New("offset is out of payload range")
|
||||
errLengthIsOutOfRange = errors.New("length is out of payload range")
|
||||
errEmptyPartsList = errors.New("empty parts list")
|
||||
errorZeroRangeLength = errors.New("zero range length")
|
||||
)
|
||||
|
||||
func newMultiObjectReader(ctx context.Context, cfg multiObjectReaderConfig) (*multiObjectReader, error) {
|
||||
if len(cfg.parts) == 0 {
|
||||
return nil, errEmptyPartsList
|
||||
}
|
||||
|
||||
r := &multiObjectReader{
|
||||
ctx: ctx,
|
||||
layer: cfg.layer,
|
||||
prm: getFrostFSParams{
|
||||
bktInfo: cfg.bktInfo,
|
||||
},
|
||||
parts: cfg.parts,
|
||||
}
|
||||
|
||||
if cfg.off+cfg.ln == 0 {
|
||||
return r, nil
|
||||
}
|
||||
|
||||
if cfg.off > 0 && cfg.ln == 0 {
|
||||
return nil, errorZeroRangeLength
|
||||
}
|
||||
|
||||
alexvanin
commented
Should it be recursive and not Should it be recursive and not `x.curReader.Read`? What is the max recursion deepness here? As far as I see the max deepness is one, because `x.curReader` is set above and we expect early return from the recursive function.
dkirillov
commented
It seems it should be. We have to handle the case when we ended read one part and must start another.
The max deepness is number of parts (if > Should it be recursive and not `x.curReader.Read`?
It seems it should be. We have to handle the case when we ended read one part and must start another.
> What is the max recursion deepness here?
The max deepness is number of parts (if `p` is large enough to contain all parts payload at once)
|
||||
startPartIndex, startPartOffset := findStartPart(cfg)
|
||||
if startPartIndex == -1 {
|
||||
return nil, errOffsetIsOutOfRange
|
||||
}
|
||||
r.startPartOffset = startPartOffset
|
||||
|
||||
endPartIndex, endPartLength := findEndPart(cfg)
|
||||
if endPartIndex == -1 {
|
||||
return nil, errLengthIsOutOfRange
|
||||
}
|
||||
r.endPartLength = endPartLength
|
||||
|
||||
r.parts = cfg.parts[startPartIndex : endPartIndex+1]
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func findStartPart(cfg multiObjectReaderConfig) (index int, offset uint64) {
|
||||
return findPartByPosition(cfg.off, cfg.parts)
|
||||
}
|
||||
|
||||
func findEndPart(cfg multiObjectReaderConfig) (index int, length uint64) {
|
||||
return findPartByPosition(cfg.off+cfg.ln, cfg.parts)
|
||||
}
|
||||
|
||||
func findPartByPosition(position uint64, parts []partObj) (index int, positionInPart uint64) {
|
||||
for i, part := range parts {
|
||||
if position <= part.Size {
|
||||
return i, position
|
||||
}
|
||||
position -= part.Size
|
||||
}
|
||||
|
||||
return -1, 0
|
||||
}
|
||||
|
||||
func (x *multiObjectReader) Read(p []byte) (n int, err error) {
|
||||
if x.curReader != nil {
|
||||
n, err = x.curReader.Read(p)
|
||||
if !errors.Is(err, io.EOF) {
|
||||
return n, err
|
||||
}
|
||||
x.curIndex++
|
||||
}
|
||||
|
||||
if len(x.parts) == 0 {
|
||||
if x.curIndex == len(x.parts) {
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
for x.off != 0 {
|
||||
if x.parts[0].Size < x.off {
|
||||
x.parts = x.parts[1:]
|
||||
x.off -= x.parts[0].Size
|
||||
} else {
|
||||
x.prm.off = x.off
|
||||
x.off = 0
|
||||
}
|
||||
x.prm.oid = x.parts[x.curIndex].OID
|
||||
|
||||
if x.curIndex == 0 {
|
||||
x.prm.off = x.startPartOffset
|
||||
x.prm.ln = x.parts[x.curIndex].Size - x.startPartOffset
|
||||
}
|
||||
|
||||
x.prm.oid = x.parts[0].OID
|
||||
|
||||
if x.ln != 0 {
|
||||
if x.parts[0].Size < x.prm.off+x.ln {
|
||||
x.prm.ln = x.parts[0].Size - x.prm.off
|
||||
x.ln -= x.prm.ln
|
||||
} else {
|
||||
x.prm.ln = x.ln
|
||||
x.ln = 0
|
||||
x.parts = x.parts[:1]
|
||||
}
|
||||
if x.curIndex == len(x.parts)-1 {
|
||||
x.prm.ln = x.endPartLength - x.prm.off
|
||||
}
|
||||
|
||||
x.curReader, err = x.layer.initFrostFSObjectPayloadReader(x.ctx, x.prm)
|
||||
|
@ -72,8 +143,6 @@ func (x *multiObjectReader) Read(p []byte) (n int, err error) {
|
|||
x.prm.off = 0
|
||||
x.prm.ln = 0
|
||||
|
||||
x.parts = x.parts[1:]
|
||||
|
||||
next, err := x.Read(p[n:])
|
||||
|
||||
return n + next, err
|
||||
|
|
127
api/layer/multi_object_reader_test.go
Normal file
|
@ -0,0 +1,127 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type readerInitiatorMock struct {
|
||||
parts map[oid.ID][]byte
|
||||
}
|
||||
|
||||
func (r *readerInitiatorMock) initFrostFSObjectPayloadReader(_ context.Context, p getFrostFSParams) (io.Reader, error) {
|
||||
partPayload, ok := r.parts[p.oid]
|
||||
if !ok {
|
||||
return nil, errors.New("part not found")
|
||||
}
|
||||
|
||||
if p.off+p.ln == 0 {
|
||||
return bytes.NewReader(partPayload), nil
|
||||
}
|
||||
|
||||
if p.off > uint64(len(partPayload)-1) {
|
||||
return nil, fmt.Errorf("invalid offset: %d/%d", p.off, len(partPayload))
|
||||
}
|
||||
|
||||
if p.off+p.ln > uint64(len(partPayload)) {
|
||||
return nil, fmt.Errorf("invalid range: %d-%d/%d", p.off, p.off+p.ln, len(partPayload))
|
||||
}
|
||||
|
||||
return bytes.NewReader(partPayload[p.off : p.off+p.ln]), nil
|
||||
}
|
||||
|
||||
func prepareDataReader() ([]byte, []partObj, *readerInitiatorMock) {
|
||||
mockInitReader := &readerInitiatorMock{
|
||||
parts: map[oid.ID][]byte{
|
||||
oidtest.ID(): []byte("first part 1"),
|
||||
oidtest.ID(): []byte("second part 2"),
|
||||
oidtest.ID(): []byte("third part 3"),
|
||||
},
|
||||
}
|
||||
|
||||
var fullPayload []byte
|
||||
parts := make([]partObj, 0, len(mockInitReader.parts))
|
||||
for id, payload := range mockInitReader.parts {
|
||||
parts = append(parts, partObj{OID: id, Size: uint64(len(payload))})
|
||||
fullPayload = append(fullPayload, payload...)
|
||||
}
|
||||
|
||||
return fullPayload, parts, mockInitReader
|
||||
}
|
||||
|
||||
func TestMultiReader(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
fullPayload, parts, mockInitReader := prepareDataReader()
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
off uint64
|
||||
ln uint64
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "simple read all",
|
||||
},
|
||||
{
|
||||
name: "simple read with length",
|
||||
ln: uint64(len(fullPayload)),
|
||||
},
|
||||
{
|
||||
name: "middle of parts",
|
||||
off: parts[0].Size + 2,
|
||||
ln: 4,
|
||||
},
|
||||
{
|
||||
name: "first and second",
|
||||
off: parts[0].Size - 4,
|
||||
ln: 8,
|
||||
},
|
||||
{
|
||||
name: "first and third",
|
||||
off: parts[0].Size - 4,
|
||||
ln: parts[1].Size + 8,
|
||||
},
|
||||
{
|
||||
name: "offset out of range",
|
||||
off: uint64(len(fullPayload) + 1),
|
||||
ln: 1,
|
||||
err: errOffsetIsOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "zero length",
|
||||
off: parts[1].Size + 1,
|
||||
ln: 0,
|
||||
err: errorZeroRangeLength,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
multiReader, err := newMultiObjectReader(ctx, multiObjectReaderConfig{
|
||||
layer: mockInitReader,
|
||||
parts: parts,
|
||||
off: tc.off,
|
||||
ln: tc.ln,
|
||||
})
|
||||
require.ErrorIs(t, err, tc.err)
|
||||
|
||||
if tc.err == nil {
|
||||
off := tc.off
|
||||
ln := tc.ln
|
||||
if off+ln == 0 {
|
||||
ln = uint64(len(fullPayload))
|
||||
}
|
||||
data, err := io.ReadAll(multiReader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, fullPayload[off:off+ln], data)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -143,14 +143,13 @@ func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
|
|||
}
|
||||
}
|
||||
|
||||
return &multiObjectReader{
|
||||
ctx: ctx,
|
||||
off: p.off,
|
||||
ln: p.ln,
|
||||
layer: n,
|
||||
parts: objParts,
|
||||
prm: getFrostFSParams{bktInfo: p.bktInfo},
|
||||
}, nil
|
||||
return newMultiObjectReader(ctx, multiObjectReaderConfig{
|
||||
layer: n,
|
||||
off: p.off,
|
||||
ln: p.ln,
|
||||
parts: objParts,
|
||||
bktInfo: p.bktInfo,
|
||||
})
|
||||
}
|
||||
|
||||
// initializes payload reader of the FrostFS object.
|
||||
|
|
Some comments will be appreciated. I guess it is the offset of complete object and total size of complete object.