[#63] poc: Fast multipart upload #157
3 changed files with 227 additions and 32 deletions
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -14,54 +15,124 @@ type partObj struct {
|
||||||
Size uint64
|
Size uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type readerInitiator interface {
|
||||||
|
initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error)
|
||||||
|
}
|
||||||
|
|
||||||
// implements io.Reader of payloads of the object list stored in the FrostFS network.
|
// implements io.Reader of payloads of the object list stored in the FrostFS network.
|
||||||
type multiObjectReader struct {
|
type multiObjectReader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
layer *layer
|
layer readerInitiator
|
||||||
|
|
||||||
off, ln uint64
|
startPartOffset uint64
|
||||||
|
endPartLength uint64
|
||||||
|
|
||||||
prm getFrostFSParams
|
prm getFrostFSParams
|
||||||
|
|
||||||
|
curIndex int
|
||||||
curReader io.Reader
|
curReader io.Reader
|
||||||
|
|
||||||
parts []partObj
|
parts []partObj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type multiObjectReaderConfig struct {
|
||||||
|
layer readerInitiator
|
||||||
|
|
||||||
|
// the offset of complete object and total size to read
|
||||||
|
off, ln uint64
|
||||||
|
|
||||||
|
bktInfo *data.BucketInfo
|
||||||
|
parts []partObj
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errOffsetIsOutOfRange = errors.New("offset is out of payload range")
|
||||||
|
errLengthIsOutOfRange = errors.New("length is out of payload range")
|
||||||
|
errEmptyPartsList = errors.New("empty parts list")
|
||||||
|
errorZeroRangeLength = errors.New("zero range length")
|
||||||
|
)
|
||||||
|
|
||||||
|
func newMultiObjectReader(ctx context.Context, cfg multiObjectReaderConfig) (*multiObjectReader, error) {
|
||||||
|
if len(cfg.parts) == 0 {
|
||||||
|
return nil, errEmptyPartsList
|
||||||
|
}
|
||||||
|
|
||||||
|
r := &multiObjectReader{
|
||||||
|
ctx: ctx,
|
||||||
|
layer: cfg.layer,
|
||||||
|
prm: getFrostFSParams{
|
||||||
|
bktInfo: cfg.bktInfo,
|
||||||
|
},
|
||||||
|
parts: cfg.parts,
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.off+cfg.ln == 0 {
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.off > 0 && cfg.ln == 0 {
|
||||||
|
return nil, errorZeroRangeLength
|
||||||
|
}
|
||||||
|
|
||||||
|
startPartIndex, startPartOffset := findStartPart(cfg)
|
||||||
|
if startPartIndex == -1 {
|
||||||
|
return nil, errOffsetIsOutOfRange
|
||||||
|
}
|
||||||
|
r.startPartOffset = startPartOffset
|
||||||
|
|
||||||
|
endPartIndex, endPartLength := findEndPart(cfg)
|
||||||
|
if endPartIndex == -1 {
|
||||||
|
return nil, errLengthIsOutOfRange
|
||||||
|
}
|
||||||
|
r.endPartLength = endPartLength
|
||||||
|
|
||||||
|
r.parts = cfg.parts[startPartIndex : endPartIndex+1]
|
||||||
|
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func findStartPart(cfg multiObjectReaderConfig) (index int, offset uint64) {
|
||||||
|
return findPartByPosition(cfg.off, cfg.parts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func findEndPart(cfg multiObjectReaderConfig) (index int, length uint64) {
|
||||||
|
return findPartByPosition(cfg.off+cfg.ln, cfg.parts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func findPartByPosition(position uint64, parts []partObj) (index int, positionInPart uint64) {
|
||||||
|
for i, part := range parts {
|
||||||
|
if position <= part.Size {
|
||||||
|
return i, position
|
||||||
|
}
|
||||||
|
position -= part.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1, 0
|
||||||
|
}
|
||||||
|
|
||||||
func (x *multiObjectReader) Read(p []byte) (n int, err error) {
|
func (x *multiObjectReader) Read(p []byte) (n int, err error) {
|
||||||
if x.curReader != nil {
|
if x.curReader != nil {
|
||||||
n, err = x.curReader.Read(p)
|
n, err = x.curReader.Read(p)
|
||||||
if !errors.Is(err, io.EOF) {
|
if !errors.Is(err, io.EOF) {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
x.curIndex++
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(x.parts) == 0 {
|
if x.curIndex == len(x.parts) {
|
||||||
return n, io.EOF
|
return n, io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
for x.off != 0 {
|
x.prm.oid = x.parts[x.curIndex].OID
|
||||||
if x.parts[0].Size < x.off {
|
|
||||||
x.parts = x.parts[1:]
|
if x.curIndex == 0 {
|
||||||
x.off -= x.parts[0].Size
|
x.prm.off = x.startPartOffset
|
||||||
} else {
|
x.prm.ln = x.parts[x.curIndex].Size - x.startPartOffset
|
||||||
x.prm.off = x.off
|
|
||||||
x.off = 0
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
x.prm.oid = x.parts[0].OID
|
if x.curIndex == len(x.parts)-1 {
|
||||||
|
x.prm.ln = x.endPartLength - x.prm.off
|
||||||
if x.ln != 0 {
|
|
||||||
if x.parts[0].Size < x.prm.off+x.ln {
|
|
||||||
x.prm.ln = x.parts[0].Size - x.prm.off
|
|
||||||
x.ln -= x.prm.ln
|
|
||||||
} else {
|
|
||||||
x.prm.ln = x.ln
|
|
||||||
x.ln = 0
|
|
||||||
x.parts = x.parts[:1]
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
x.curReader, err = x.layer.initFrostFSObjectPayloadReader(x.ctx, x.prm)
|
x.curReader, err = x.layer.initFrostFSObjectPayloadReader(x.ctx, x.prm)
|
||||||
|
@ -72,8 +143,6 @@ func (x *multiObjectReader) Read(p []byte) (n int, err error) {
|
||||||
x.prm.off = 0
|
x.prm.off = 0
|
||||||
x.prm.ln = 0
|
x.prm.ln = 0
|
||||||
|
|
||||||
x.parts = x.parts[1:]
|
|
||||||
|
|
||||||
next, err := x.Read(p[n:])
|
next, err := x.Read(p[n:])
|
||||||
|
|
||||||
return n + next, err
|
return n + next, err
|
||||||
|
|
127
api/layer/multi_object_reader_test.go
Normal file
127
api/layer/multi_object_reader_test.go
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
package layer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type readerInitiatorMock struct {
|
||||||
|
parts map[oid.ID][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readerInitiatorMock) initFrostFSObjectPayloadReader(_ context.Context, p getFrostFSParams) (io.Reader, error) {
|
||||||
|
partPayload, ok := r.parts[p.oid]
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("part not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.off+p.ln == 0 {
|
||||||
|
return bytes.NewReader(partPayload), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.off > uint64(len(partPayload)-1) {
|
||||||
|
return nil, fmt.Errorf("invalid offset: %d/%d", p.off, len(partPayload))
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.off+p.ln > uint64(len(partPayload)) {
|
||||||
|
return nil, fmt.Errorf("invalid range: %d-%d/%d", p.off, p.off+p.ln, len(partPayload))
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes.NewReader(partPayload[p.off : p.off+p.ln]), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareDataReader() ([]byte, []partObj, *readerInitiatorMock) {
|
||||||
|
mockInitReader := &readerInitiatorMock{
|
||||||
|
parts: map[oid.ID][]byte{
|
||||||
|
oidtest.ID(): []byte("first part 1"),
|
||||||
|
oidtest.ID(): []byte("second part 2"),
|
||||||
|
oidtest.ID(): []byte("third part 3"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var fullPayload []byte
|
||||||
|
parts := make([]partObj, 0, len(mockInitReader.parts))
|
||||||
|
for id, payload := range mockInitReader.parts {
|
||||||
|
parts = append(parts, partObj{OID: id, Size: uint64(len(payload))})
|
||||||
|
fullPayload = append(fullPayload, payload...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fullPayload, parts, mockInitReader
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiReader(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
fullPayload, parts, mockInitReader := prepareDataReader()
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
off uint64
|
||||||
|
ln uint64
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple read all",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "simple read with length",
|
||||||
|
ln: uint64(len(fullPayload)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "middle of parts",
|
||||||
|
off: parts[0].Size + 2,
|
||||||
|
ln: 4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "first and second",
|
||||||
|
off: parts[0].Size - 4,
|
||||||
|
ln: 8,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "first and third",
|
||||||
|
off: parts[0].Size - 4,
|
||||||
|
ln: parts[1].Size + 8,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "offset out of range",
|
||||||
|
off: uint64(len(fullPayload) + 1),
|
||||||
|
ln: 1,
|
||||||
|
err: errOffsetIsOutOfRange,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "zero length",
|
||||||
|
off: parts[1].Size + 1,
|
||||||
|
ln: 0,
|
||||||
|
err: errorZeroRangeLength,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
multiReader, err := newMultiObjectReader(ctx, multiObjectReaderConfig{
|
||||||
|
layer: mockInitReader,
|
||||||
|
parts: parts,
|
||||||
|
off: tc.off,
|
||||||
|
ln: tc.ln,
|
||||||
|
})
|
||||||
|
require.ErrorIs(t, err, tc.err)
|
||||||
|
|
||||||
|
if tc.err == nil {
|
||||||
|
off := tc.off
|
||||||
|
ln := tc.ln
|
||||||
|
if off+ln == 0 {
|
||||||
|
ln = uint64(len(fullPayload))
|
||||||
|
}
|
||||||
|
data, err := io.ReadAll(multiReader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, fullPayload[off:off+ln], data)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -143,14 +143,13 @@ func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &multiObjectReader{
|
return newMultiObjectReader(ctx, multiObjectReaderConfig{
|
||||||
ctx: ctx,
|
layer: n,
|
||||||
off: p.off,
|
off: p.off,
|
||||||
ln: p.ln,
|
ln: p.ln,
|
||||||
layer: n,
|
parts: objParts,
|
||||||
parts: objParts,
|
bktInfo: p.bktInfo,
|
||||||
prm: getFrostFSParams{bktInfo: p.bktInfo},
|
})
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// initializes payload reader of the FrostFS object.
|
// initializes payload reader of the FrostFS object.
|
||||||
|
|
Loading…
Reference in a new issue