[#142] Fix multipart-objects download #143
11 changed files with 517 additions and 62 deletions
|
@ -19,11 +19,10 @@ import (
|
||||||
|
|
||||||
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
|
@ -453,7 +452,7 @@ func (a *app) setHealthStatus() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) Serve() {
|
func (a *app) Serve() {
|
||||||
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool)))
|
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)))
|
||||||
|
|
||||||
// Configure router.
|
// Configure router.
|
||||||
a.configureRouter(handler)
|
a.configureRouter(handler)
|
||||||
|
|
|
@ -229,6 +229,10 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (Res
|
||||||
return &resObjectSearchMock{res: res}, nil
|
return &resObjectSearchMock{res: res}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) InitMultiObjectReader(context.Context, PrmInitMultiObjectReader) (io.Reader, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
||||||
for _, attr := range attributes {
|
for _, attr := range attributes {
|
||||||
if attr.Key() == filter.Header() {
|
if attr.Key() == filter.Header() {
|
||||||
|
@ -269,10 +273,3 @@ func (t *TestFrostFS) isAllowed(cnrID cid.ID, userID user.ID, op acl.Op, objID o
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
|
||||||
var addr oid.Address
|
|
||||||
addr.SetContainer(cnr)
|
|
||||||
addr.SetObject(obj)
|
|
||||||
return addr
|
|
||||||
}
|
|
||||||
|
|
|
@ -119,6 +119,14 @@ type PrmObjectSearch struct {
|
||||||
Filters object.SearchFilters
|
Filters object.SearchFilters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PrmInitMultiObjectReader struct {
|
||||||
|
// payload range
|
||||||
|
Off, Ln uint64
|
||||||
|
|
||||||
|
Addr oid.Address
|
||||||
dkirillov marked this conversation as resolved
Outdated
|
|||||||
|
Bearer *bearer.Token
|
||||||
|
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we need Why do we need `Log`?
|
|||||||
|
|
||||||
type ResObjectSearch interface {
|
type ResObjectSearch interface {
|
||||||
Read(buf []oid.ID) (int, error)
|
Read(buf []oid.ID) (int, error)
|
||||||
Iterate(f func(oid.ID) bool) error
|
Iterate(f func(oid.ID) bool) error
|
||||||
|
@ -140,6 +148,8 @@ type FrostFS interface {
|
||||||
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
||||||
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
||||||
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
||||||
|
InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error)
|
||||||
|
|
||||||
utils.EpochInfoFetcher
|
utils.EpochInfoFetcher
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,9 +211,7 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
addr := newAddress(bktInfo.CID, *objID)
|
||||||
addr.SetContainer(bktInfo.CID)
|
|
||||||
addr.SetObject(*objID)
|
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addr)
|
f(ctx, *h.newRequest(c, log), addr)
|
||||||
}
|
}
|
||||||
|
@ -256,10 +264,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
|
||||||
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
addr := newAddress(bktInfo.CID, foundOid.OID)
|
||||||
var addr oid.Address
|
|
||||||
addr.SetContainer(bktInfo.CID)
|
|
||||||
addr.SetObject(foundOid.OID)
|
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addr)
|
f(ctx, *h.newRequest(c, log), addr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,17 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const attributeMultipartObjectSize = "S3-Multipart-Object-Size"
|
||||||
|
|
||||||
// MultipartFile provides standard ReadCloser interface and also allows one to
|
// MultipartFile provides standard ReadCloser interface and also allows one to
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Do we really need separate const for that? Do we really need separate const for that?
|
|||||||
// get file name, it's used for multipart uploads.
|
// get file name, it's used for multipart uploads.
|
||||||
type MultipartFile interface {
|
type MultipartFile interface {
|
||||||
|
@ -45,3 +49,30 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
||||||
return part, nil
|
return part, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||||
|
func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
|
||||||
|
cid, ok := p.obj.Header.ContainerID()
|
||||||
|
if !ok {
|
||||||
|
return nil, 0, errors.New("no container id set")
|
||||||
|
}
|
||||||
|
oid, ok := p.obj.Header.ID()
|
||||||
|
if !ok {
|
||||||
|
return nil, 0, errors.New("no object id set")
|
||||||
|
}
|
||||||
|
size, err := strconv.ParseUint(p.strSize, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
ctx := p.req.RequestCtx
|
||||||
|
params := PrmInitMultiObjectReader{
|
||||||
|
Addr: newAddress(cid, oid),
|
||||||
|
Bearer: bearerToken(ctx),
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We can omit setting fields with default values We can omit setting fields with default values
|
|||||||
|
}
|
||||||
|
payload, err := h.frostfs.InitMultiObjectReader(ctx, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.NopCloser(payload), size, nil
|
||||||
|
}
|
||||||
|
|
|
@ -47,20 +47,26 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
||||||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) {
|
type getMultiobjectBodyParams struct {
|
||||||
|
obj *Object
|
||||||
|
req request
|
||||||
|
strSize string
|
||||||
|
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we need bktInfo? Why do we need bktInfo?
|
|||||||
|
|
||||||
|
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address) {
|
||||||
var (
|
var (
|
||||||
err error
|
shouldDownload = req.QueryArgs().GetBool("download")
|
||||||
dis = "inline"
|
start = time.Now()
|
||||||
start = time.Now()
|
filename string
|
||||||
filename string
|
filepath string
|
||||||
filepath string
|
contentType string
|
||||||
)
|
)
|
||||||
|
|
||||||
prm := PrmObjectGet{
|
prm := PrmObjectGet{
|
||||||
PrmAuth: PrmAuth{
|
PrmAuth: PrmAuth{
|
||||||
BearerToken: bearerToken(ctx),
|
BearerToken: bearerToken(ctx),
|
||||||
},
|
},
|
||||||
Address: objectAddress,
|
Address: objAddress,
|
||||||
}
|
}
|
||||||
|
|
||||||
rObj, err := h.frostfs.GetObject(ctx, prm)
|
rObj, err := h.frostfs.GetObject(ctx, prm)
|
||||||
|
@ -70,15 +76,9 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can't close reader in this function, so how to do it?
|
// we can't close reader in this function, so how to do it?
|
||||||
|
req.setIDs(rObj.Header)
|
||||||
if req.Request.URI().QueryArgs().GetBool("download") {
|
payload := rObj.Payload
|
||||||
dis = "attachment"
|
|
||||||
}
|
|
||||||
|
|
||||||
payloadSize := rObj.Header.PayloadSize()
|
payloadSize := rObj.Header.PayloadSize()
|
||||||
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
|
||||||
var contentType string
|
|
||||||
for _, attr := range rObj.Header.Attributes() {
|
for _, attr := range rObj.Header.Attributes() {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Is it necessary to make such changes? Is it necessary to make such changes?
Can we make diff as small as possible?
|
|||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
val := attr.Value()
|
val := attr.Value()
|
||||||
|
@ -93,31 +93,41 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
||||||
case object.AttributeFileName:
|
case object.AttributeFileName:
|
||||||
filename = val
|
filename = val
|
||||||
case object.AttributeTimestamp:
|
case object.AttributeTimestamp:
|
||||||
value, err := strconv.ParseInt(val, 10, 64)
|
if err = req.setTimestamp(val); err != nil {
|
||||||
if err != nil {
|
req.log.Error(logs.CouldntParseCreationDate,
|
||||||
req.log.Info(logs.CouldntParseCreationDate,
|
|
||||||
zap.String("key", key),
|
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
|
||||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
|
||||||
case object.AttributeContentType:
|
case object.AttributeContentType:
|
||||||
contentType = val
|
contentType = val
|
||||||
case object.AttributeFilePath:
|
case object.AttributeFilePath:
|
||||||
filepath = val
|
filepath = val
|
||||||
|
case attributeMultipartObjectSize:
|
||||||
|
payload, payloadSize, err = h.getPayload(getMultiobjectBodyParams{
|
||||||
|
obj: rObj,
|
||||||
|
req: req,
|
||||||
|
strSize: val,
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
This if should be placed after This if should be placed after `for` loop. We have not guarantees that attribute `FileName` occurs before `FilePath` in `rObj.Header.Attributes()`
|
|||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
req.handleFrostFSErr(err, start)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if filename == "" {
|
||||||
|
filename = filepath
|
||||||
|
}
|
||||||
|
|
||||||
idsToResponse(&req.Response, &rObj.Header)
|
req.setDisposition(shouldDownload, filename)
|
||||||
|
|
||||||
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||||
|
|
||||||
if len(contentType) == 0 {
|
if len(contentType) == 0 {
|
||||||
// determine the Content-Type from the payload head
|
// determine the Content-Type from the payload head
|
||||||
var payloadHead []byte
|
var payloadHead []byte
|
||||||
|
|
||||||
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
||||||
return rObj.Payload, nil
|
return payload, nil
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
||||||
|
@ -129,20 +139,46 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
||||||
var headReader io.Reader = bytes.NewReader(payloadHead)
|
var headReader io.Reader = bytes.NewReader(payloadHead)
|
||||||
|
|
||||||
if err != io.EOF { // otherwise, we've already read full payload
|
if err != io.EOF { // otherwise, we've already read full payload
|
||||||
headReader = io.MultiReader(headReader, rObj.Payload)
|
headReader = io.MultiReader(headReader, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
||||||
// if it implements io.Closer and that's useful for us.
|
// if it implements io.Closer and that's useful for us.
|
||||||
rObj.Payload = readCloser{headReader, rObj.Payload}
|
payload = readCloser{headReader, payload}
|
||||||
}
|
}
|
||||||
req.SetContentType(contentType)
|
req.SetContentType(contentType)
|
||||||
|
req.Response.SetBodyStream(payload, int(payloadSize))
|
||||||
|
}
|
||||||
|
|
||||||
if filename == "" {
|
func (r *request) setIDs(obj object.Object) {
|
||||||
filename = filepath
|
objID, _ := obj.ID()
|
||||||
|
cnrID, _ := obj.ContainerID()
|
||||||
|
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||||
|
r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String())
|
||||||
|
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) setDisposition(shouldDownload bool, filename string) {
|
||||||
|
const (
|
||||||
|
inlineDisposition = "inline"
|
||||||
|
attachmentDisposition = "attachment"
|
||||||
|
)
|
||||||
|
|
||||||
|
dis := inlineDisposition
|
||||||
|
if shouldDownload {
|
||||||
|
dis = attachmentDisposition
|
||||||
}
|
}
|
||||||
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||||
|
}
|
||||||
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
|
||||||
|
func (r *request) setTimestamp(timestamp string) error {
|
||||||
|
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||||
|
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,8 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -90,3 +92,10 @@ func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
|
||||||
}
|
}
|
||||||
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(cnr)
|
||||||
|
addr.SetObject(obj)
|
||||||
|
return addr
|
||||||
|
}
|
||||||
|
|
|
@ -33,9 +33,9 @@ func NewFrostFS(p *pool.Pool) *FrostFS {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Container implements frostfs.FrostFS interface method.
|
// Container implements frostfs.FrostFS interface method.
|
||||||
func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) {
|
func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContainer) (*container.Container, error) {
|
||||||
prm := pool.PrmContainerGet{
|
prm := pool.PrmContainerGet{
|
||||||
ContainerID: layerPrm.ContainerID,
|
ContainerID: containerPrm.ContainerID,
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := x.pool.GetContainer(ctx, prm)
|
res, err := x.pool.GetContainer(ctx, prm)
|
241
internal/service/frostfs/multi_object_reader.go
Normal file
241
internal/service/frostfs/multi_object_reader.go
Normal file
|
@ -0,0 +1,241 @@
|
||||||
|
package frostfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PartInfo is upload information about part.
|
||||||
|
type PartInfo struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
UploadID string `json:"uploadId"`
|
||||||
|
Number int `json:"number"`
|
||||||
|
OID oid.ID `json:"oid"`
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
The field should name The field should name `OID`
|
|||||||
|
Size uint64 `json:"size"`
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Do we really need Do we really need `BucketInfo`? It seems that having CID be enough
|
|||||||
|
ETag string `json:"etag"`
|
||||||
|
MD5 string `json:"md5"`
|
||||||
|
Created time.Time `json:"created"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetFrostFSParams struct {
|
||||||
|
// payload range
|
||||||
|
Off, Ln uint64
|
||||||
|
Addr oid.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
type PartObj struct {
|
||||||
|
OID oid.ID
|
||||||
|
Size uint64
|
||||||
|
}
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
This field is unused This field is unused
|
|||||||
|
|
||||||
|
type readerInitiator interface {
|
||||||
|
InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.ReadCloser, error)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we use cc @alexvanin Why do we use `io.Reader` rather than `io.ReadCloser`?
I undesrtand that this is copy-paste from s3-gw. But probably in s3-gw we also should fix it
cc @alexvanin
|
|||||||
|
}
|
||||||
|
|
||||||
|
// MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network.
|
||||||
|
type MultiObjectReader struct {
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
layer readerInitiator
|
||||||
|
|
||||||
|
startPartOffset uint64
|
||||||
|
endPartLength uint64
|
||||||
|
|
||||||
|
prm GetFrostFSParams
|
||||||
|
|
||||||
|
curIndex int
|
||||||
|
curReader io.ReadCloser
|
||||||
|
|
||||||
|
parts []PartObj
|
||||||
|
}
|
||||||
|
|
||||||
|
type MultiObjectReaderConfig struct {
|
||||||
|
Initiator readerInitiator
|
||||||
|
|
||||||
|
// the offset of complete object and total size to read
|
||||||
|
Off, Ln uint64
|
||||||
|
|
||||||
|
Addr oid.Address
|
||||||
|
Parts []PartObj
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errOffsetIsOutOfRange = errors.New("offset is out of payload range")
|
||||||
|
errLengthIsOutOfRange = errors.New("length is out of payload range")
|
||||||
|
errEmptyPartsList = errors.New("empty parts list")
|
||||||
|
errorZeroRangeLength = errors.New("zero range length")
|
||||||
|
)
|
||||||
|
|
||||||
|
func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) {
|
||||||
|
combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{
|
||||||
|
PrmAuth: handler.PrmAuth{BearerToken: p.Bearer},
|
||||||
|
Address: p.Addr,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get combined object '%s': %w", p.Addr.Object().EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parts []*PartInfo
|
||||||
|
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objParts := make([]PartObj, len(parts))
|
||||||
|
for i, part := range parts {
|
||||||
|
objParts[i] = PartObj{
|
||||||
|
OID: part.OID,
|
||||||
|
Size: part.Size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
||||||
|
Initiator: x,
|
||||||
|
Off: p.Off,
|
||||||
|
Ln: p.Ln,
|
||||||
|
Parts: objParts,
|
||||||
|
Addr: p.Addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) {
|
||||||
|
if len(cfg.Parts) == 0 {
|
||||||
|
return nil, errEmptyPartsList
|
||||||
|
}
|
||||||
|
|
||||||
|
r := &MultiObjectReader{
|
||||||
|
ctx: ctx,
|
||||||
|
layer: cfg.Initiator,
|
||||||
|
prm: GetFrostFSParams{
|
||||||
|
Addr: cfg.Addr,
|
||||||
|
},
|
||||||
|
parts: cfg.Parts,
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.Off+cfg.Ln == 0 {
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.Off > 0 && cfg.Ln == 0 {
|
||||||
|
return nil, errorZeroRangeLength
|
||||||
|
}
|
||||||
|
|
||||||
|
startPartIndex, startPartOffset := findStartPart(cfg)
|
||||||
|
if startPartIndex == -1 {
|
||||||
|
return nil, errOffsetIsOutOfRange
|
||||||
|
}
|
||||||
|
r.startPartOffset = startPartOffset
|
||||||
|
|
||||||
|
endPartIndex, endPartLength := findEndPart(cfg)
|
||||||
|
if endPartIndex == -1 {
|
||||||
|
return nil, errLengthIsOutOfRange
|
||||||
|
}
|
||||||
|
r.endPartLength = endPartLength
|
||||||
|
|
||||||
|
r.parts = cfg.Parts[startPartIndex : endPartIndex+1]
|
||||||
|
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func findStartPart(cfg MultiObjectReaderConfig) (index int, offset uint64) {
|
||||||
|
position := cfg.Off
|
||||||
|
for i, part := range cfg.Parts {
|
||||||
|
// Strict inequality when searching for start position to avoid reading zero length part.
|
||||||
|
if position < part.Size {
|
||||||
|
return i, position
|
||||||
|
}
|
||||||
|
position -= part.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func findEndPart(cfg MultiObjectReaderConfig) (index int, length uint64) {
|
||||||
|
position := cfg.Off + cfg.Ln
|
||||||
|
for i, part := range cfg.Parts {
|
||||||
|
// Non-strict inequality when searching for end position to avoid out of payload range error.
|
||||||
|
if position <= part.Size {
|
||||||
|
return i, position
|
||||||
|
}
|
||||||
|
position -= part.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *MultiObjectReader) Read(p []byte) (n int, err error) {
|
||||||
|
if x.curReader != nil {
|
||||||
|
n, err = x.curReader.Read(p)
|
||||||
|
if err != nil {
|
||||||
|
if closeErr := x.curReader.Close(); closeErr != nil {
|
||||||
|
return n, fmt.Errorf("%w (close err: %v)", err, closeErr)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
I suppose it's better to keep original error something like this (or just log closing error)
I suppose it's better to keep original error something like this (or just log closing error)
```golang
if closeErr := x.curReader.Close(); closeErr != nil {
return n, fmt.Errorf("%w (close err: %v)", err, closeErr)
}
```
|
|||||||
|
}
|
||||||
|
}
|
||||||
|
if !errors.Is(err, io.EOF) {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
x.curIndex++
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.curIndex == len(x.parts) {
|
||||||
|
return n, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
x.prm.Addr.SetObject(x.parts[x.curIndex].OID)
|
||||||
|
|
||||||
|
if x.curIndex == 0 {
|
||||||
|
x.prm.Off = x.startPartOffset
|
||||||
|
x.prm.Ln = x.parts[x.curIndex].Size - x.startPartOffset
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.curIndex == len(x.parts)-1 {
|
||||||
|
x.prm.Ln = x.endPartLength - x.prm.Off
|
||||||
|
}
|
||||||
|
|
||||||
|
x.curReader, err = x.layer.InitFrostFSObjectPayloadReader(x.ctx, x.prm)
|
||||||
|
if err != nil {
|
||||||
|
return n, fmt.Errorf("init payload reader for the next part: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
x.prm.Off = 0
|
||||||
|
x.prm.Ln = 0
|
||||||
|
|
||||||
|
next, err := x.Read(p[n:])
|
||||||
|
|
||||||
|
return n + next, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
|
||||||
|
// Zero range corresponds to full payload (panics if only offset is set).
|
||||||
|
func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.ReadCloser, error) {
|
||||||
|
var prmAuth handler.PrmAuth
|
||||||
|
|
||||||
|
if p.Off+p.Ln != 0 {
|
||||||
|
prm := handler.PrmObjectRange{
|
||||||
|
PrmAuth: prmAuth,
|
||||||
|
PayloadRange: [2]uint64{p.Off, p.Ln},
|
||||||
|
Address: p.Addr,
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.RangeObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := handler.PrmObjectGet{
|
||||||
|
PrmAuth: prmAuth,
|
||||||
|
Address: p.Addr,
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := x.GetObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Payload, nil
|
||||||
|
}
|
137
internal/service/frostfs/multi_object_reader_test.go
Normal file
137
internal/service/frostfs/multi_object_reader_test.go
Normal file
|
@ -0,0 +1,137 @@
|
||||||
|
package frostfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type readerInitiatorMock struct {
|
||||||
|
parts map[oid.ID][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readerInitiatorMock) InitFrostFSObjectPayloadReader(_ context.Context, p GetFrostFSParams) (io.ReadCloser, error) {
|
||||||
|
partPayload, ok := r.parts[p.Addr.Object()]
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("part not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Off+p.Ln == 0 {
|
||||||
|
return io.NopCloser(bytes.NewReader(partPayload)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Off > uint64(len(partPayload)-1) {
|
||||||
|
return nil, fmt.Errorf("invalid offset: %d/%d", p.Off, len(partPayload))
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Off+p.Ln > uint64(len(partPayload)) {
|
||||||
|
return nil, fmt.Errorf("invalid range: %d-%d/%d", p.Off, p.Off+p.Ln, len(partPayload))
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.NopCloser(bytes.NewReader(partPayload[p.Off : p.Off+p.Ln])), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareDataReader() ([]byte, []PartObj, *readerInitiatorMock) {
|
||||||
|
mockInitReader := &readerInitiatorMock{
|
||||||
|
parts: map[oid.ID][]byte{
|
||||||
|
oidtest.ID(): []byte("first part 1"),
|
||||||
|
oidtest.ID(): []byte("second part 2"),
|
||||||
|
oidtest.ID(): []byte("third part 3"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var fullPayload []byte
|
||||||
|
parts := make([]PartObj, 0, len(mockInitReader.parts))
|
||||||
|
for id, payload := range mockInitReader.parts {
|
||||||
|
parts = append(parts, PartObj{OID: id, Size: uint64(len(payload))})
|
||||||
|
fullPayload = append(fullPayload, payload...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fullPayload, parts, mockInitReader
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiReader(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
fullPayload, parts, mockInitReader := prepareDataReader()
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
off uint64
|
||||||
|
ln uint64
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple read all",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "simple read with length",
|
||||||
|
ln: uint64(len(fullPayload)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "middle of parts",
|
||||||
|
off: parts[0].Size + 2,
|
||||||
|
ln: 4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "first and second",
|
||||||
|
off: parts[0].Size - 4,
|
||||||
|
ln: 8,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "first and third",
|
||||||
|
off: parts[0].Size - 4,
|
||||||
|
ln: parts[1].Size + 8,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "second part",
|
||||||
|
off: parts[0].Size,
|
||||||
|
ln: parts[1].Size,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "second and third",
|
||||||
|
off: parts[0].Size,
|
||||||
|
ln: parts[1].Size + parts[2].Size,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "offset out of range",
|
||||||
|
off: uint64(len(fullPayload) + 1),
|
||||||
|
ln: 1,
|
||||||
|
err: errOffsetIsOutOfRange,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "zero length",
|
||||||
|
off: parts[1].Size + 1,
|
||||||
|
ln: 0,
|
||||||
|
err: errorZeroRangeLength,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
||||||
|
Initiator: mockInitReader,
|
||||||
|
Parts: parts,
|
||||||
|
Off: tc.off,
|
||||||
|
Ln: tc.ln,
|
||||||
|
})
|
||||||
|
require.ErrorIs(t, err, tc.err)
|
||||||
|
|
||||||
|
if tc.err == nil {
|
||||||
|
off := tc.off
|
||||||
|
ln := tc.ln
|
||||||
|
if off+ln == 0 {
|
||||||
|
ln = uint64(len(fullPayload))
|
||||||
|
}
|
||||||
|
data, err := io.ReadAll(multiReader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, fullPayload[off:off+ln], data)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package services
|
package frostfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
|
@ -21,11 +21,11 @@ func (m nodeMeta) GetValue() []byte {
|
||||||
|
|
||||||
type nodeResponse struct {
|
type nodeResponse struct {
|
||||||
meta []nodeMeta
|
meta []nodeMeta
|
||||||
timestamp uint64
|
timestamp []uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n nodeResponse) GetTimestamp() []uint64 {
|
func (n nodeResponse) GetTimestamp() []uint64 {
|
||||||
return []uint64{n.timestamp}
|
return n.timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n nodeResponse) GetMeta() []Meta {
|
func (n nodeResponse) GetMeta() []Meta {
|
||||||
|
@ -59,7 +59,7 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
name: "one node of the object version",
|
name: "one node of the object version",
|
||||||
nodes: []NodeResponse{
|
nodes: []NodeResponse{
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 1,
|
timestamp: []uint64{1},
|
||||||
meta: []nodeMeta{
|
meta: []nodeMeta{
|
||||||
{
|
{
|
||||||
key: oidKV,
|
key: oidKV,
|
||||||
|
@ -74,11 +74,11 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
name: "one node of the object version and one node of the secondary object",
|
name: "one node of the object version and one node of the secondary object",
|
||||||
nodes: []NodeResponse{
|
nodes: []NodeResponse{
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 3,
|
timestamp: []uint64{3},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 1,
|
timestamp: []uint64{1},
|
||||||
meta: []nodeMeta{
|
meta: []nodeMeta{
|
||||||
{
|
{
|
||||||
key: oidKV,
|
key: oidKV,
|
||||||
|
@ -93,11 +93,11 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
name: "all nodes represent a secondary object",
|
name: "all nodes represent a secondary object",
|
||||||
nodes: []NodeResponse{
|
nodes: []NodeResponse{
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 3,
|
timestamp: []uint64{3},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 5,
|
timestamp: []uint64{5},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -107,7 +107,7 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
name: "several nodes of different types and with different timestamp",
|
name: "several nodes of different types and with different timestamp",
|
||||||
nodes: []NodeResponse{
|
nodes: []NodeResponse{
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 1,
|
timestamp: []uint64{1},
|
||||||
meta: []nodeMeta{
|
meta: []nodeMeta{
|
||||||
{
|
{
|
||||||
key: oidKV,
|
key: oidKV,
|
||||||
|
@ -116,11 +116,11 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 3,
|
timestamp: []uint64{3},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 4,
|
timestamp: []uint64{4},
|
||||||
meta: []nodeMeta{
|
meta: []nodeMeta{
|
||||||
{
|
{
|
||||||
key: oidKV,
|
key: oidKV,
|
||||||
|
@ -129,7 +129,7 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 6,
|
timestamp: []uint64{6},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
Loading…
Add table
Reference in a new issue
Why do we need
ObjectInfo
It seems we can keep only object address.Also
BktInfo
is unnecessary