[#142] Fix multipart-objects download #143

Merged
alexvanin merged 1 commit from nzinkevich/frostfs-http-gw:fix_multipart into master 2024-10-17 11:00:23 +00:00
11 changed files with 517 additions and 62 deletions

View file

@ -19,11 +19,10 @@ import (
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
@ -453,7 +452,7 @@ func (a *app) setHealthStatus() {
} }
func (a *app) Serve() { func (a *app) Serve() {
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool))) handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)))
// Configure router. // Configure router.
a.configureRouter(handler) a.configureRouter(handler)

View file

@ -229,6 +229,10 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (Res
return &resObjectSearchMock{res: res}, nil return &resObjectSearchMock{res: res}, nil
} }
func (t *TestFrostFS) InitMultiObjectReader(context.Context, PrmInitMultiObjectReader) (io.Reader, error) {
return nil, nil
}
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool { func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
for _, attr := range attributes { for _, attr := range attributes {
if attr.Key() == filter.Header() { if attr.Key() == filter.Header() {
@ -269,10 +273,3 @@ func (t *TestFrostFS) isAllowed(cnrID cid.ID, userID user.ID, op acl.Op, objID o
} }
return false return false
} }
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(obj)
return addr
}

View file

@ -119,6 +119,14 @@ type PrmObjectSearch struct {
Filters object.SearchFilters Filters object.SearchFilters
} }
type PrmInitMultiObjectReader struct {
// payload range
Off, Ln uint64
Addr oid.Address
dkirillov marked this conversation as resolved Outdated

Why do we need ObjectInfo It seems we can keep only object address.
Also BktInfo is unnecessary

Why do we need `ObjectInfo` It seems we can keep only object address. Also `BktInfo` is unnecessary
Bearer *bearer.Token
}
dkirillov marked this conversation as resolved Outdated

Why do we need Log?

Why do we need `Log`?
type ResObjectSearch interface { type ResObjectSearch interface {
Read(buf []oid.ID) (int, error) Read(buf []oid.ID) (int, error)
Iterate(f func(oid.ID) bool) error Iterate(f func(oid.ID) bool) error
@ -140,6 +148,8 @@ type FrostFS interface {
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error) RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error) SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error)
utils.EpochInfoFetcher utils.EpochInfoFetcher
} }
@ -201,9 +211,7 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
return return
} }
var addr oid.Address addr := newAddress(bktInfo.CID, *objID)
addr.SetContainer(bktInfo.CID)
addr.SetObject(*objID)
f(ctx, *h.newRequest(c, log), addr) f(ctx, *h.newRequest(c, log), addr)
} }
@ -256,10 +264,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
response.Error(c, "object deleted", fasthttp.StatusNotFound) response.Error(c, "object deleted", fasthttp.StatusNotFound)
return return
} }
addr := newAddress(bktInfo.CID, foundOid.OID)
var addr oid.Address
addr.SetContainer(bktInfo.CID)
addr.SetObject(foundOid.OID)
f(ctx, *h.newRequest(c, log), addr) f(ctx, *h.newRequest(c, log), addr)
} }

View file

@ -1,13 +1,17 @@
package handler package handler
import ( import (
"errors"
"io" "io"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"go.uber.org/zap" "go.uber.org/zap"
) )
const attributeMultipartObjectSize = "S3-Multipart-Object-Size"
// MultipartFile provides standard ReadCloser interface and also allows one to // MultipartFile provides standard ReadCloser interface and also allows one to
dkirillov marked this conversation as resolved Outdated

Do we really need separate const for that?

Do we really need separate const for that?
// get file name, it's used for multipart uploads. // get file name, it's used for multipart uploads.
type MultipartFile interface { type MultipartFile interface {
@ -45,3 +49,30 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
return part, nil return part, nil
} }
} }
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
cid, ok := p.obj.Header.ContainerID()
if !ok {
return nil, 0, errors.New("no container id set")
}
oid, ok := p.obj.Header.ID()
if !ok {
return nil, 0, errors.New("no object id set")
}
size, err := strconv.ParseUint(p.strSize, 10, 64)
if err != nil {
return nil, 0, err
}
ctx := p.req.RequestCtx
params := PrmInitMultiObjectReader{
Addr: newAddress(cid, oid),
Bearer: bearerToken(ctx),
dkirillov marked this conversation as resolved Outdated

We can omit setting fields with default values

We can omit setting fields with default values
}
payload, err := h.frostfs.InitMultiObjectReader(ctx, params)
if err != nil {
return nil, 0, err
}
return io.NopCloser(payload), size, nil
}

View file

@ -47,20 +47,26 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
return http.DetectContentType(buf), buf, err // to not lose io.EOF return http.DetectContentType(buf), buf, err // to not lose io.EOF
} }
func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) { type getMultiobjectBodyParams struct {
obj *Object
req request
strSize string
}
dkirillov marked this conversation as resolved Outdated

Why do we need bktInfo?

Why do we need bktInfo?
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address) {
var ( var (
err error shouldDownload = req.QueryArgs().GetBool("download")
dis = "inline" start = time.Now()
start = time.Now() filename string
filename string filepath string
filepath string contentType string
) )
prm := PrmObjectGet{ prm := PrmObjectGet{
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx), BearerToken: bearerToken(ctx),
}, },
Address: objectAddress, Address: objAddress,
} }
rObj, err := h.frostfs.GetObject(ctx, prm) rObj, err := h.frostfs.GetObject(ctx, prm)
@ -70,15 +76,9 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
} }
// we can't close reader in this function, so how to do it? // we can't close reader in this function, so how to do it?
req.setIDs(rObj.Header)
if req.Request.URI().QueryArgs().GetBool("download") { payload := rObj.Payload
dis = "attachment"
}
payloadSize := rObj.Header.PayloadSize() payloadSize := rObj.Header.PayloadSize()
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
var contentType string
for _, attr := range rObj.Header.Attributes() { for _, attr := range rObj.Header.Attributes() {
dkirillov marked this conversation as resolved Outdated

Is it necessary to make such changes?
Can we make diff as small as possible?

Is it necessary to make such changes? Can we make diff as small as possible?
key := attr.Key() key := attr.Key()
val := attr.Value() val := attr.Value()
@ -93,31 +93,41 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
case object.AttributeFileName: case object.AttributeFileName:
filename = val filename = val
case object.AttributeTimestamp: case object.AttributeTimestamp:
value, err := strconv.ParseInt(val, 10, 64) if err = req.setTimestamp(val); err != nil {
if err != nil { req.log.Error(logs.CouldntParseCreationDate,
req.log.Info(logs.CouldntParseCreationDate,
zap.String("key", key),
zap.String("val", val), zap.String("val", val),
zap.Error(err)) zap.Error(err))
continue
} }
req.Response.Header.Set(fasthttp.HeaderLastModified,
time.Unix(value, 0).UTC().Format(http.TimeFormat))
case object.AttributeContentType: case object.AttributeContentType:
contentType = val contentType = val
case object.AttributeFilePath: case object.AttributeFilePath:
filepath = val filepath = val
case attributeMultipartObjectSize:
payload, payloadSize, err = h.getPayload(getMultiobjectBodyParams{
obj: rObj,
req: req,
strSize: val,
dkirillov marked this conversation as resolved Outdated

This if should be placed after for loop. We have not guarantees that attribute FileName occurs before FilePath in rObj.Header.Attributes()

This if should be placed after `for` loop. We have not guarantees that attribute `FileName` occurs before `FilePath` in `rObj.Header.Attributes()`
})
if err != nil {
req.handleFrostFSErr(err, start)
return
}
} }
} }
if filename == "" {
filename = filepath
}
idsToResponse(&req.Response, &rObj.Header) req.setDisposition(shouldDownload, filename)
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
if len(contentType) == 0 { if len(contentType) == 0 {
// determine the Content-Type from the payload head // determine the Content-Type from the payload head
var payloadHead []byte var payloadHead []byte
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) { contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
return rObj.Payload, nil return payload, nil
}) })
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err)) req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
@ -129,20 +139,46 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
var headReader io.Reader = bytes.NewReader(payloadHead) var headReader io.Reader = bytes.NewReader(payloadHead)
if err != io.EOF { // otherwise, we've already read full payload if err != io.EOF { // otherwise, we've already read full payload
headReader = io.MultiReader(headReader, rObj.Payload) headReader = io.MultiReader(headReader, payload)
} }
// note: we could do with io.Reader, but SetBodyStream below closes body stream // note: we could do with io.Reader, but SetBodyStream below closes body stream
// if it implements io.Closer and that's useful for us. // if it implements io.Closer and that's useful for us.
rObj.Payload = readCloser{headReader, rObj.Payload} payload = readCloser{headReader, payload}
} }
req.SetContentType(contentType) req.SetContentType(contentType)
req.Response.SetBodyStream(payload, int(payloadSize))
}
if filename == "" { func (r *request) setIDs(obj object.Object) {
filename = filepath objID, _ := obj.ID()
cnrID, _ := obj.ContainerID()
r.Response.Header.Set(hdrObjectID, objID.String())
r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String())
r.Response.Header.Set(hdrContainerID, cnrID.String())
}
func (r *request) setDisposition(shouldDownload bool, filename string) {
const (
inlineDisposition = "inline"
attachmentDisposition = "attachment"
)
dis := inlineDisposition
if shouldDownload {
dis = attachmentDisposition
} }
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
}
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
func (r *request) setTimestamp(timestamp string) error {
value, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return err
}
r.Response.Header.Set(fasthttp.HeaderLastModified,
time.Unix(value, 0).UTC().Format(http.TimeFormat))
return nil
} }

View file

@ -12,6 +12,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -90,3 +92,10 @@ func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
} }
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
} }
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(obj)
return addr
}

View file

@ -33,9 +33,9 @@ func NewFrostFS(p *pool.Pool) *FrostFS {
} }
// Container implements frostfs.FrostFS interface method. // Container implements frostfs.FrostFS interface method.
func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) { func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContainer) (*container.Container, error) {
prm := pool.PrmContainerGet{ prm := pool.PrmContainerGet{
ContainerID: layerPrm.ContainerID, ContainerID: containerPrm.ContainerID,
} }
res, err := x.pool.GetContainer(ctx, prm) res, err := x.pool.GetContainer(ctx, prm)

View file

@ -0,0 +1,241 @@
package frostfs
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
// PartInfo is upload information about part.
type PartInfo struct {
Key string `json:"key"`
UploadID string `json:"uploadId"`
Number int `json:"number"`
OID oid.ID `json:"oid"`
dkirillov marked this conversation as resolved Outdated

The field should name OID

The field should name `OID`
Size uint64 `json:"size"`
dkirillov marked this conversation as resolved Outdated

Do we really need BucketInfo? It seems that having CID be enough

Do we really need `BucketInfo`? It seems that having CID be enough
ETag string `json:"etag"`
MD5 string `json:"md5"`
Created time.Time `json:"created"`
}
type GetFrostFSParams struct {
// payload range
Off, Ln uint64
Addr oid.Address
}
type PartObj struct {
OID oid.ID
Size uint64
}
dkirillov marked this conversation as resolved Outdated

This field is unused

This field is unused
type readerInitiator interface {
InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.ReadCloser, error)
dkirillov marked this conversation as resolved Outdated

Why do we use io.Reader rather than io.ReadCloser?
I undesrtand that this is copy-paste from s3-gw. But probably in s3-gw we also should fix it

cc @alexvanin

Why do we use `io.Reader` rather than `io.ReadCloser`? I undesrtand that this is copy-paste from s3-gw. But probably in s3-gw we also should fix it cc @alexvanin
}
// MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network.
type MultiObjectReader struct {
ctx context.Context
layer readerInitiator
startPartOffset uint64
endPartLength uint64
prm GetFrostFSParams
curIndex int
curReader io.ReadCloser
parts []PartObj
}
type MultiObjectReaderConfig struct {
Initiator readerInitiator
// the offset of complete object and total size to read
Off, Ln uint64
Addr oid.Address
Parts []PartObj
}
var (
errOffsetIsOutOfRange = errors.New("offset is out of payload range")
errLengthIsOutOfRange = errors.New("length is out of payload range")
errEmptyPartsList = errors.New("empty parts list")
errorZeroRangeLength = errors.New("zero range length")
)
func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) {
combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{
PrmAuth: handler.PrmAuth{BearerToken: p.Bearer},
Address: p.Addr,
})
if err != nil {
return nil, fmt.Errorf("get combined object '%s': %w", p.Addr.Object().EncodeToString(), err)
}
var parts []*PartInfo
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
}
objParts := make([]PartObj, len(parts))
for i, part := range parts {
objParts[i] = PartObj{
OID: part.OID,
Size: part.Size,
}
}
return NewMultiObjectReader(ctx, MultiObjectReaderConfig{
Initiator: x,
Off: p.Off,
Ln: p.Ln,
Parts: objParts,
Addr: p.Addr,
})
}
func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) {
if len(cfg.Parts) == 0 {
return nil, errEmptyPartsList
}
r := &MultiObjectReader{
ctx: ctx,
layer: cfg.Initiator,
prm: GetFrostFSParams{
Addr: cfg.Addr,
},
parts: cfg.Parts,
}
if cfg.Off+cfg.Ln == 0 {
return r, nil
}
if cfg.Off > 0 && cfg.Ln == 0 {
return nil, errorZeroRangeLength
}
startPartIndex, startPartOffset := findStartPart(cfg)
if startPartIndex == -1 {
return nil, errOffsetIsOutOfRange
}
r.startPartOffset = startPartOffset
endPartIndex, endPartLength := findEndPart(cfg)
if endPartIndex == -1 {
return nil, errLengthIsOutOfRange
}
r.endPartLength = endPartLength
r.parts = cfg.Parts[startPartIndex : endPartIndex+1]
return r, nil
}
func findStartPart(cfg MultiObjectReaderConfig) (index int, offset uint64) {
position := cfg.Off
for i, part := range cfg.Parts {
// Strict inequality when searching for start position to avoid reading zero length part.
if position < part.Size {
return i, position
}
position -= part.Size
}
return -1, 0
}
func findEndPart(cfg MultiObjectReaderConfig) (index int, length uint64) {
position := cfg.Off + cfg.Ln
for i, part := range cfg.Parts {
// Non-strict inequality when searching for end position to avoid out of payload range error.
if position <= part.Size {
return i, position
}
position -= part.Size
}
return -1, 0
}
func (x *MultiObjectReader) Read(p []byte) (n int, err error) {
if x.curReader != nil {
n, err = x.curReader.Read(p)
if err != nil {
if closeErr := x.curReader.Close(); closeErr != nil {
return n, fmt.Errorf("%w (close err: %v)", err, closeErr)
dkirillov marked this conversation as resolved Outdated

I suppose it's better to keep original error something like this (or just log closing error)

if closeErr := x.curReader.Close(); closeErr != nil {
		return n, fmt.Errorf("%w (close err: %v)", err, closeErr)
}
I suppose it's better to keep original error something like this (or just log closing error) ```golang if closeErr := x.curReader.Close(); closeErr != nil { return n, fmt.Errorf("%w (close err: %v)", err, closeErr) } ```
}
}
if !errors.Is(err, io.EOF) {
return n, err
}
x.curIndex++
}
if x.curIndex == len(x.parts) {
return n, io.EOF
}
x.prm.Addr.SetObject(x.parts[x.curIndex].OID)
if x.curIndex == 0 {
x.prm.Off = x.startPartOffset
x.prm.Ln = x.parts[x.curIndex].Size - x.startPartOffset
}
if x.curIndex == len(x.parts)-1 {
x.prm.Ln = x.endPartLength - x.prm.Off
}
x.curReader, err = x.layer.InitFrostFSObjectPayloadReader(x.ctx, x.prm)
if err != nil {
return n, fmt.Errorf("init payload reader for the next part: %w", err)
}
x.prm.Off = 0
x.prm.Ln = 0
next, err := x.Read(p[n:])
return n + next, err
}
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
// Zero range corresponds to full payload (panics if only offset is set).
func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.ReadCloser, error) {
var prmAuth handler.PrmAuth
if p.Off+p.Ln != 0 {
prm := handler.PrmObjectRange{
PrmAuth: prmAuth,
PayloadRange: [2]uint64{p.Off, p.Ln},
Address: p.Addr,
}
return x.RangeObject(ctx, prm)
}
prm := handler.PrmObjectGet{
PrmAuth: prmAuth,
Address: p.Addr,
}
res, err := x.GetObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Payload, nil
}

View file

@ -0,0 +1,137 @@
package frostfs
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"testing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
type readerInitiatorMock struct {
parts map[oid.ID][]byte
}
func (r *readerInitiatorMock) InitFrostFSObjectPayloadReader(_ context.Context, p GetFrostFSParams) (io.ReadCloser, error) {
partPayload, ok := r.parts[p.Addr.Object()]
if !ok {
return nil, errors.New("part not found")
}
if p.Off+p.Ln == 0 {
return io.NopCloser(bytes.NewReader(partPayload)), nil
}
if p.Off > uint64(len(partPayload)-1) {
return nil, fmt.Errorf("invalid offset: %d/%d", p.Off, len(partPayload))
}
if p.Off+p.Ln > uint64(len(partPayload)) {
return nil, fmt.Errorf("invalid range: %d-%d/%d", p.Off, p.Off+p.Ln, len(partPayload))
}
return io.NopCloser(bytes.NewReader(partPayload[p.Off : p.Off+p.Ln])), nil
}
func prepareDataReader() ([]byte, []PartObj, *readerInitiatorMock) {
mockInitReader := &readerInitiatorMock{
parts: map[oid.ID][]byte{
oidtest.ID(): []byte("first part 1"),
oidtest.ID(): []byte("second part 2"),
oidtest.ID(): []byte("third part 3"),
},
}
var fullPayload []byte
parts := make([]PartObj, 0, len(mockInitReader.parts))
for id, payload := range mockInitReader.parts {
parts = append(parts, PartObj{OID: id, Size: uint64(len(payload))})
fullPayload = append(fullPayload, payload...)
}
return fullPayload, parts, mockInitReader
}
func TestMultiReader(t *testing.T) {
ctx := context.Background()
fullPayload, parts, mockInitReader := prepareDataReader()
for _, tc := range []struct {
name string
off uint64
ln uint64
err error
}{
{
name: "simple read all",
},
{
name: "simple read with length",
ln: uint64(len(fullPayload)),
},
{
name: "middle of parts",
off: parts[0].Size + 2,
ln: 4,
},
{
name: "first and second",
off: parts[0].Size - 4,
ln: 8,
},
{
name: "first and third",
off: parts[0].Size - 4,
ln: parts[1].Size + 8,
},
{
name: "second part",
off: parts[0].Size,
ln: parts[1].Size,
},
{
name: "second and third",
off: parts[0].Size,
ln: parts[1].Size + parts[2].Size,
},
{
name: "offset out of range",
off: uint64(len(fullPayload) + 1),
ln: 1,
err: errOffsetIsOutOfRange,
},
{
name: "zero length",
off: parts[1].Size + 1,
ln: 0,
err: errorZeroRangeLength,
},
} {
t.Run(tc.name, func(t *testing.T) {
multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{
Initiator: mockInitReader,
Parts: parts,
Off: tc.off,
Ln: tc.ln,
})
require.ErrorIs(t, err, tc.err)
if tc.err == nil {
off := tc.off
ln := tc.ln
if off+ln == 0 {
ln = uint64(len(fullPayload))
}
data, err := io.ReadAll(multiReader)
require.NoError(t, err)
require.Equal(t, fullPayload[off:off+ln], data)
}
})
}
}

View file

@ -1,4 +1,4 @@
package services package frostfs
import ( import (
"context" "context"

View file

@ -21,11 +21,11 @@ func (m nodeMeta) GetValue() []byte {
type nodeResponse struct { type nodeResponse struct {
meta []nodeMeta meta []nodeMeta
timestamp uint64 timestamp []uint64
} }
func (n nodeResponse) GetTimestamp() []uint64 { func (n nodeResponse) GetTimestamp() []uint64 {
return []uint64{n.timestamp} return n.timestamp
} }
func (n nodeResponse) GetMeta() []Meta { func (n nodeResponse) GetMeta() []Meta {
@ -59,7 +59,7 @@ func TestGetLatestNode(t *testing.T) {
name: "one node of the object version", name: "one node of the object version",
nodes: []NodeResponse{ nodes: []NodeResponse{
nodeResponse{ nodeResponse{
timestamp: 1, timestamp: []uint64{1},
meta: []nodeMeta{ meta: []nodeMeta{
{ {
key: oidKV, key: oidKV,
@ -74,11 +74,11 @@ func TestGetLatestNode(t *testing.T) {
name: "one node of the object version and one node of the secondary object", name: "one node of the object version and one node of the secondary object",
nodes: []NodeResponse{ nodes: []NodeResponse{
nodeResponse{ nodeResponse{
timestamp: 3, timestamp: []uint64{3},
meta: []nodeMeta{}, meta: []nodeMeta{},
}, },
nodeResponse{ nodeResponse{
timestamp: 1, timestamp: []uint64{1},
meta: []nodeMeta{ meta: []nodeMeta{
{ {
key: oidKV, key: oidKV,
@ -93,11 +93,11 @@ func TestGetLatestNode(t *testing.T) {
name: "all nodes represent a secondary object", name: "all nodes represent a secondary object",
nodes: []NodeResponse{ nodes: []NodeResponse{
nodeResponse{ nodeResponse{
timestamp: 3, timestamp: []uint64{3},
meta: []nodeMeta{}, meta: []nodeMeta{},
}, },
nodeResponse{ nodeResponse{
timestamp: 5, timestamp: []uint64{5},
meta: []nodeMeta{}, meta: []nodeMeta{},
}, },
}, },
@ -107,7 +107,7 @@ func TestGetLatestNode(t *testing.T) {
name: "several nodes of different types and with different timestamp", name: "several nodes of different types and with different timestamp",
nodes: []NodeResponse{ nodes: []NodeResponse{
nodeResponse{ nodeResponse{
timestamp: 1, timestamp: []uint64{1},
meta: []nodeMeta{ meta: []nodeMeta{
{ {
key: oidKV, key: oidKV,
@ -116,11 +116,11 @@ func TestGetLatestNode(t *testing.T) {
}, },
}, },
nodeResponse{ nodeResponse{
timestamp: 3, timestamp: []uint64{3},
meta: []nodeMeta{}, meta: []nodeMeta{},
}, },
nodeResponse{ nodeResponse{
timestamp: 4, timestamp: []uint64{4},
meta: []nodeMeta{ meta: []nodeMeta{
{ {
key: oidKV, key: oidKV,
@ -129,7 +129,7 @@ func TestGetLatestNode(t *testing.T) {
}, },
}, },
nodeResponse{ nodeResponse{
timestamp: 6, timestamp: []uint64{6},
meta: []nodeMeta{}, meta: []nodeMeta{},
}, },
}, },