From 0b03f693a2d049390d65303077c6318054ea5350 Mon Sep 17 00:00:00 2001 From: Nikita Zinkevich Date: Wed, 18 Sep 2024 07:35:26 +0300 Subject: [PATCH] [#142] Fix multipart-objects download Signed-off-by: Nikita Zinkevich --- cmd/http-gw/app.go | 5 +- internal/data/object.go | 41 +++ internal/handler/frostfs_mock.go | 4 + internal/handler/handler.go | 24 +- internal/handler/head.go | 3 +- internal/handler/multipart.go | 50 ++++ internal/handler/reader.go | 107 ++++---- internal/handler/utils.go | 50 ++++ internal/{ => service}/frostfs/frostfs.go | 4 +- .../service/frostfs/multi_object_reader.go | 234 ++++++++++++++++++ .../frostfs/multi_object_reader_test.go | 137 ++++++++++ .../frostfs}/pool_wrapper.go | 2 +- tree/tree_test.go | 22 +- 13 files changed, 604 insertions(+), 79 deletions(-) create mode 100644 internal/data/object.go rename internal/{ => service}/frostfs/frostfs.go (97%) create mode 100644 internal/service/frostfs/multi_object_reader.go create mode 100644 internal/service/frostfs/multi_object_reader_test.go rename internal/{frostfs/services => service/frostfs}/pool_wrapper.go (99%) diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go index 8d5930d..f8300ec 100644 --- a/cmd/http-gw/app.go +++ b/cmd/http-gw/app.go @@ -19,11 +19,10 @@ import ( v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" @@ -453,7 +452,7 @@ func (a *app) setHealthStatus() { } func (a *app) Serve() { - handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool))) + handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool))) // Configure router. a.configureRouter(handler) diff --git a/internal/data/object.go b/internal/data/object.go new file mode 100644 index 0000000..2575218 --- /dev/null +++ b/internal/data/object.go @@ -0,0 +1,41 @@ +package data + +import ( + "time" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type ( + ObjectInfo struct { + ID oid.ID + CID cid.ID + + Bucket string + Name string + Size uint64 + Headers map[string]string + } + + // PartInfo is upload information about part. + PartInfo struct { + Key string `json:"key"` + UploadID string `json:"uploadId"` + Number int `json:"number"` + OID oid.ID `json:"oid"` + Size uint64 `json:"size"` + ETag string `json:"etag"` + MD5 string `json:"md5"` + Created time.Time `json:"created"` + } +) + +// Address returns object address. +func (o *ObjectInfo) Address() oid.Address { + var addr oid.Address + addr.SetContainer(o.CID) + addr.SetObject(o.ID) + + return addr +} diff --git a/internal/handler/frostfs_mock.go b/internal/handler/frostfs_mock.go index 9f4378a..c99a588 100644 --- a/internal/handler/frostfs_mock.go +++ b/internal/handler/frostfs_mock.go @@ -229,6 +229,10 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (Res return &resObjectSearchMock{res: res}, nil } +func (t *TestFrostFS) InitMultiObjectReader(context.Context, PrmInitMultiObjectReader) (io.Reader, error) { + return nil, nil +} + func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool { for _, attr := range attributes { if attr.Key() == filter.Header() { diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 8b07af0..64be9a2 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -119,6 +119,16 @@ type PrmObjectSearch struct { Filters object.SearchFilters } +type PrmInitMultiObjectReader struct { + // payload range + Off, Ln uint64 + + ObjInfo *data.ObjectInfo + BktInfo *data.BucketInfo + Log *zap.Logger + Bearer *bearer.Token +} + type ResObjectSearch interface { Read(buf []oid.ID) (int, error) Iterate(f func(oid.ID) bool) error @@ -140,6 +150,8 @@ type FrostFS interface { RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error) CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error) + InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error) + utils.EpochInfoFetcher } @@ -179,7 +191,7 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler { // byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. -func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) { var ( idCnr, _ = c.UserValue("cid").(string) idObj, _ = c.UserValue("oid").(string) @@ -205,12 +217,12 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ addr.SetContainer(bktInfo.CID) addr.SetObject(*objID) - f(ctx, *h.newRequest(c, log), addr) + f(ctx, *h.newRequest(c, log), addr, bktInfo) } // byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. -func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) { var ( bucketname = c.UserValue("cid").(string) key = c.UserValue("oid").(string) @@ -261,11 +273,11 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r addr.SetContainer(bktInfo.CID) addr.SetObject(foundOid.OID) - f(ctx, *h.newRequest(c, log), addr) + f(ctx, *h.newRequest(c, log), addr, bktInfo) } // byAttribute is a wrapper similar to byAddress. -func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) { scid, _ := c.UserValue("cid").(string) key, _ := c.UserValue("attr_key").(string) val, _ := c.UserValue("attr_val").(string) @@ -322,7 +334,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re addrObj.SetContainer(bktInfo.CID) addrObj.SetObject(buf[0]) - f(ctx, *h.newRequest(c, log), addrObj) + f(ctx, *h.newRequest(c, log), addrObj, bktInfo) } // resolveContainer decode container id, if it's not a valid container id diff --git a/internal/handler/head.go b/internal/handler/head.go index f0a1e94..3f8cf92 100644 --- a/internal/handler/head.go +++ b/internal/handler/head.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -24,7 +25,7 @@ const ( hdrContainerID = "X-Container-Id" ) -func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) { +func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address, _ *data.BucketInfo) { var start = time.Now() btoken := bearerToken(ctx) diff --git a/internal/handler/multipart.go b/internal/handler/multipart.go index de9242f..d470e14 100644 --- a/internal/handler/multipart.go +++ b/internal/handler/multipart.go @@ -1,13 +1,21 @@ package handler import ( + "errors" "io" + "strconv" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "go.uber.org/zap" ) +const ( + frostFSSystemMetadataPrefix = "S3-" + attributeMultipartObjectSize = frostFSSystemMetadataPrefix + "Multipart-Object-Size" +) + // MultipartFile provides standard ReadCloser interface and also allows one to // get file name, it's used for multipart uploads. type MultipartFile interface { @@ -45,3 +53,45 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF return part, nil } } + +// getPayload returns initial payload if object is not multipart else composes new reader with parts data. +func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) { + sizeValue, ok := p.attrs[attributeMultipartObjectSize] + if !ok { + return p.obj.Payload, p.obj.Header.PayloadSize(), nil + } + cid, ok := p.obj.Header.ContainerID() + if !ok { + return nil, 0, errors.New("no container id set") + } + oid, ok := p.obj.Header.ID() + if !ok { + return nil, 0, errors.New("no object id set") + } + size, err := strconv.ParseUint(sizeValue, 10, 64) + if err != nil { + return nil, 0, err + } + ctx := p.req.RequestCtx + params := PrmInitMultiObjectReader{ + Off: 0, + Ln: 0, + ObjInfo: &data.ObjectInfo{ + ID: oid, + CID: cid, + Bucket: p.bktinfo.Name, + Name: p.attrs["FilePath"], + Size: size, + Headers: p.attrs, + }, + BktInfo: p.bktinfo, + Log: h.log, + Bearer: bearerToken(ctx), + } + payload, err := h.frostfs.InitMultiObjectReader(ctx, params) + if err != nil { + return nil, 0, err + } + + return io.NopCloser(payload), size, nil +} diff --git a/internal/handler/reader.go b/internal/handler/reader.go index d901f25..a148361 100644 --- a/internal/handler/reader.go +++ b/internal/handler/reader.go @@ -5,10 +5,10 @@ import ( "context" "io" "net/http" - "path" "strconv" "time" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" @@ -47,20 +47,24 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str return http.DetectContentType(buf), buf, err // to not lose io.EOF } -func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) { +type getPayloadParams struct { + obj *Object + req request + bktinfo *data.BucketInfo + attrs map[string]string +} + +func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address, bktInfo *data.BucketInfo) { var ( - err error - dis = "inline" - start = time.Now() - filename string - filepath string + shouldDownload = req.QueryArgs().GetBool("download") + start = time.Now() ) prm := PrmObjectGet{ PrmAuth: PrmAuth{ BearerToken: bearerToken(ctx), }, - Address: objectAddress, + Address: objAddress, } rObj, err := h.frostfs.GetObject(ctx, prm) @@ -71,53 +75,40 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi // we can't close reader in this function, so how to do it? - if req.Request.URI().QueryArgs().GetBool("download") { - dis = "attachment" + attrs := makeAttributesMap(rObj.Header.Attributes()) + req.setAttributes(attrs) + + req.setIDs(rObj.Header) + req.setDisposition(shouldDownload, attrs) + + if err = req.setTimestamp(attrs[object.AttributeTimestamp]); err != nil { + req.log.Error(logs.CouldntParseCreationDate, + zap.String("val", attrs[object.AttributeTimestamp]), + zap.Error(err)) + response.Error(req.RequestCtx, "failed to convert timestamp: "+err.Error(), fasthttp.StatusInternalServerError) } - payloadSize := rObj.Header.PayloadSize() + payloadParams := getPayloadParams{ + obj: rObj, + req: req, + bktinfo: bktInfo, + attrs: attrs, + } + payload, payloadSize, err := h.getPayload(payloadParams) + if err != nil { + req.handleFrostFSErr(err, start) + return + } req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) - var contentType string - for _, attr := range rObj.Header.Attributes() { - key := attr.Key() - val := attr.Value() - if !isValidToken(key) || !isValidValue(val) { - continue - } - - key = utils.BackwardTransformIfSystem(key) - - req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) - switch key { - case object.AttributeFileName: - filename = val - case object.AttributeTimestamp: - value, err := strconv.ParseInt(val, 10, 64) - if err != nil { - req.log.Info(logs.CouldntParseCreationDate, - zap.String("key", key), - zap.String("val", val), - zap.Error(err)) - continue - } - req.Response.Header.Set(fasthttp.HeaderLastModified, - time.Unix(value, 0).UTC().Format(http.TimeFormat)) - case object.AttributeContentType: - contentType = val - case object.AttributeFilePath: - filepath = val - } - } - - idsToResponse(&req.Response, &rObj.Header) + contentType := attrs[object.AttributeContentType] if len(contentType) == 0 { // determine the Content-Type from the payload head var payloadHead []byte contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) { - return rObj.Payload, nil + return payload, nil }) if err != nil && err != io.EOF { req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err)) @@ -129,20 +120,26 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi var headReader io.Reader = bytes.NewReader(payloadHead) if err != io.EOF { // otherwise, we've already read full payload - headReader = io.MultiReader(headReader, rObj.Payload) + headReader = io.MultiReader(headReader, payload) } // note: we could do with io.Reader, but SetBodyStream below closes body stream // if it implements io.Closer and that's useful for us. - rObj.Payload = readCloser{headReader, rObj.Payload} + payload = readCloser{headReader, payload} } req.SetContentType(contentType) - - if filename == "" { - filename = filepath - } - - req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) - - req.Response.SetBodyStream(rObj.Payload, int(payloadSize)) + req.Response.SetBodyStream(payload, int(payloadSize)) +} + +func makeAttributesMap(attrs []object.Attribute) map[string]string { + attributes := make(map[string]string) + for _, attr := range attrs { + if !isValidToken(attr.Key()) || !isValidValue(attr.Value()) { + continue + } + key := utils.BackwardTransformIfSystem(attr.Key()) + + attributes[key] = attr.Value() + } + return attributes } diff --git a/internal/handler/utils.go b/internal/handler/utils.go index 7fa1158..4281b87 100644 --- a/internal/handler/utils.go +++ b/internal/handler/utils.go @@ -3,6 +3,9 @@ package handler import ( "context" "errors" + "net/http" + "path" + "strconv" "strings" "time" @@ -10,8 +13,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/valyala/fasthttp" "go.uber.org/zap" ) @@ -33,6 +38,51 @@ func (r *request) handleFrostFSErr(err error, start time.Time) { response.Error(r.RequestCtx, msg, statusCode) } +func (r *request) setAttributes(attrs map[string]string) { + for key, val := range attrs { + r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) + } +} + +func (r *request) setIDs(obj object.Object) { + objID, _ := obj.ID() + cnrID, _ := obj.ContainerID() + r.Response.Header.Set(hdrObjectID, objID.String()) + r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String()) + r.Response.Header.Set(hdrContainerID, cnrID.String()) +} + +func (r *request) setDisposition(shouldDownload bool, attrs map[string]string) { + const ( + inlineDisposition = "inline" + attachmentDisposition = "attachment" + ) + + dis := inlineDisposition + if shouldDownload { + dis = attachmentDisposition + } + + filename := attrs[object.AttributeFileName] + filepath := attrs[object.AttributeFilePath] + if filename == "" { + filename = filepath + } + + r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) +} + +func (r *request) setTimestamp(timestamp string) error { + value, err := strconv.ParseInt(timestamp, 10, 64) + if err != nil { + return err + } + r.Response.Header.Set(fasthttp.HeaderLastModified, + time.Unix(value, 0).UTC().Format(http.TimeFormat)) + + return nil +} + func bearerToken(ctx context.Context) *bearer.Token { if tkn, err := tokens.LoadBearerToken(ctx); err == nil { return tkn diff --git a/internal/frostfs/frostfs.go b/internal/service/frostfs/frostfs.go similarity index 97% rename from internal/frostfs/frostfs.go rename to internal/service/frostfs/frostfs.go index 6cf162a..c7e56a4 100644 --- a/internal/frostfs/frostfs.go +++ b/internal/service/frostfs/frostfs.go @@ -33,9 +33,9 @@ func NewFrostFS(p *pool.Pool) *FrostFS { } // Container implements frostfs.FrostFS interface method. -func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) { +func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContainer) (*container.Container, error) { prm := pool.PrmContainerGet{ - ContainerID: layerPrm.ContainerID, + ContainerID: containerPrm.ContainerID, } res, err := x.pool.GetContainer(ctx, prm) diff --git a/internal/service/frostfs/multi_object_reader.go b/internal/service/frostfs/multi_object_reader.go new file mode 100644 index 0000000..1449b29 --- /dev/null +++ b/internal/service/frostfs/multi_object_reader.go @@ -0,0 +1,234 @@ +package frostfs + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +type GetFrostFSParams struct { + // payload range + Off, Ln uint64 + + Oid oid.ID + BktInfo *data.BucketInfo +} + +type PartObj struct { + OID oid.ID + Size uint64 +} + +type readerInitiator interface { + InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error) +} + +// MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network. +type MultiObjectReader struct { + ctx context.Context + log *zap.Logger + + layer readerInitiator + + startPartOffset uint64 + endPartLength uint64 + + prm GetFrostFSParams + + curIndex int + curReader io.Reader + + parts []PartObj +} + +type MultiObjectReaderConfig struct { + Initiator readerInitiator + Log *zap.Logger + + // the offset of complete object and total size to read + Off, Ln uint64 + + BktInfo *data.BucketInfo + Parts []PartObj +} + +var ( + errOffsetIsOutOfRange = errors.New("offset is out of payload range") + errLengthIsOutOfRange = errors.New("length is out of payload range") + errEmptyPartsList = errors.New("empty parts list") + errorZeroRangeLength = errors.New("zero range length") +) + +func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) { + combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{ + PrmAuth: handler.PrmAuth{BearerToken: p.Bearer}, + Address: p.ObjInfo.Address(), + }) + if err != nil { + return nil, fmt.Errorf("get combined object '%s': %w", p.ObjInfo.ID.EncodeToString(), err) + } + + var parts []*data.PartInfo + if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil { + return nil, fmt.Errorf("unmarshal combined object parts: %w", err) + } + + objParts := make([]PartObj, len(parts)) + for i, part := range parts { + objParts[i] = PartObj{ + OID: part.OID, + Size: part.Size, + } + } + + return NewMultiObjectReader(ctx, MultiObjectReaderConfig{ + Initiator: x, + Off: p.Off, + Ln: p.Ln, + Parts: objParts, + BktInfo: p.BktInfo, + Log: p.Log, + }) +} + +func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) { + if len(cfg.Parts) == 0 { + return nil, errEmptyPartsList + } + + r := &MultiObjectReader{ + ctx: ctx, + layer: cfg.Initiator, + prm: GetFrostFSParams{ + BktInfo: cfg.BktInfo, + }, + parts: cfg.Parts, + log: cfg.Log, + } + + if cfg.Off+cfg.Ln == 0 { + return r, nil + } + + if cfg.Off > 0 && cfg.Ln == 0 { + return nil, errorZeroRangeLength + } + + startPartIndex, startPartOffset := findStartPart(cfg) + if startPartIndex == -1 { + return nil, errOffsetIsOutOfRange + } + r.startPartOffset = startPartOffset + + endPartIndex, endPartLength := findEndPart(cfg) + if endPartIndex == -1 { + return nil, errLengthIsOutOfRange + } + r.endPartLength = endPartLength + + r.parts = cfg.Parts[startPartIndex : endPartIndex+1] + + return r, nil +} + +func findStartPart(cfg MultiObjectReaderConfig) (index int, offset uint64) { + position := cfg.Off + for i, part := range cfg.Parts { + // Strict inequality when searching for start position to avoid reading zero length part. + if position < part.Size { + return i, position + } + position -= part.Size + } + + return -1, 0 +} + +func findEndPart(cfg MultiObjectReaderConfig) (index int, length uint64) { + position := cfg.Off + cfg.Ln + for i, part := range cfg.Parts { + // Non-strict inequality when searching for end position to avoid out of payload range error. + if position <= part.Size { + return i, position + } + position -= part.Size + } + + return -1, 0 +} + +func (x *MultiObjectReader) Read(p []byte) (n int, err error) { + if x.curReader != nil { + n, err = x.curReader.Read(p) + if !errors.Is(err, io.EOF) { + return n, err + } + x.curIndex++ + } + + if x.curIndex == len(x.parts) { + return n, io.EOF + } + + x.prm.Oid = x.parts[x.curIndex].OID + + if x.curIndex == 0 { + x.prm.Off = x.startPartOffset + x.prm.Ln = x.parts[x.curIndex].Size - x.startPartOffset + } + + if x.curIndex == len(x.parts)-1 { + x.prm.Ln = x.endPartLength - x.prm.Off + } + + x.curReader, err = x.layer.InitFrostFSObjectPayloadReader(x.ctx, x.prm) + if err != nil { + return n, fmt.Errorf("init payload reader for the next part: %w", err) + } + + x.prm.Off = 0 + x.prm.Ln = 0 + + next, err := x.Read(p[n:]) + + return n + next, err +} + +// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object. +// Zero range corresponds to full payload (panics if only offset is set). +func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error) { + var prmAuth handler.PrmAuth + + var addr oid.Address + addr.SetContainer(p.BktInfo.CID) + addr.SetObject(p.Oid) + + if p.Off+p.Ln != 0 { + prm := handler.PrmObjectRange{ + PrmAuth: prmAuth, + PayloadRange: [2]uint64{p.Off, p.Ln}, + Address: addr, + } + + return x.RangeObject(ctx, prm) + } + + prm := handler.PrmObjectGet{ + PrmAuth: prmAuth, + Address: addr, + } + + res, err := x.GetObject(ctx, prm) + if err != nil { + return nil, err + } + + return res.Payload, nil +} diff --git a/internal/service/frostfs/multi_object_reader_test.go b/internal/service/frostfs/multi_object_reader_test.go new file mode 100644 index 0000000..6b333c7 --- /dev/null +++ b/internal/service/frostfs/multi_object_reader_test.go @@ -0,0 +1,137 @@ +package frostfs + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "testing" + + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +type readerInitiatorMock struct { + parts map[oid.ID][]byte +} + +func (r *readerInitiatorMock) InitFrostFSObjectPayloadReader(_ context.Context, p GetFrostFSParams) (io.Reader, error) { + partPayload, ok := r.parts[p.Oid] + if !ok { + return nil, errors.New("part not found") + } + + if p.Off+p.Ln == 0 { + return bytes.NewReader(partPayload), nil + } + + if p.Off > uint64(len(partPayload)-1) { + return nil, fmt.Errorf("invalid offset: %d/%d", p.Off, len(partPayload)) + } + + if p.Off+p.Ln > uint64(len(partPayload)) { + return nil, fmt.Errorf("invalid range: %d-%d/%d", p.Off, p.Off+p.Ln, len(partPayload)) + } + + return bytes.NewReader(partPayload[p.Off : p.Off+p.Ln]), nil +} + +func prepareDataReader() ([]byte, []PartObj, *readerInitiatorMock) { + mockInitReader := &readerInitiatorMock{ + parts: map[oid.ID][]byte{ + oidtest.ID(): []byte("first part 1"), + oidtest.ID(): []byte("second part 2"), + oidtest.ID(): []byte("third part 3"), + }, + } + + var fullPayload []byte + parts := make([]PartObj, 0, len(mockInitReader.parts)) + for id, payload := range mockInitReader.parts { + parts = append(parts, PartObj{OID: id, Size: uint64(len(payload))}) + fullPayload = append(fullPayload, payload...) + } + + return fullPayload, parts, mockInitReader +} + +func TestMultiReader(t *testing.T) { + ctx := context.Background() + + fullPayload, parts, mockInitReader := prepareDataReader() + + for _, tc := range []struct { + name string + off uint64 + ln uint64 + err error + }{ + { + name: "simple read all", + }, + { + name: "simple read with length", + ln: uint64(len(fullPayload)), + }, + { + name: "middle of parts", + off: parts[0].Size + 2, + ln: 4, + }, + { + name: "first and second", + off: parts[0].Size - 4, + ln: 8, + }, + { + name: "first and third", + off: parts[0].Size - 4, + ln: parts[1].Size + 8, + }, + { + name: "second part", + off: parts[0].Size, + ln: parts[1].Size, + }, + { + name: "second and third", + off: parts[0].Size, + ln: parts[1].Size + parts[2].Size, + }, + { + name: "offset out of range", + off: uint64(len(fullPayload) + 1), + ln: 1, + err: errOffsetIsOutOfRange, + }, + { + name: "zero length", + off: parts[1].Size + 1, + ln: 0, + err: errorZeroRangeLength, + }, + } { + t.Run(tc.name, func(t *testing.T) { + multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{ + Initiator: mockInitReader, + Parts: parts, + Off: tc.off, + Ln: tc.ln, + }) + require.ErrorIs(t, err, tc.err) + + if tc.err == nil { + off := tc.off + ln := tc.ln + if off+ln == 0 { + ln = uint64(len(fullPayload)) + } + data, err := io.ReadAll(multiReader) + require.NoError(t, err) + require.Equal(t, fullPayload[off:off+ln], data) + } + }) + } +} diff --git a/internal/frostfs/services/pool_wrapper.go b/internal/service/frostfs/pool_wrapper.go similarity index 99% rename from internal/frostfs/services/pool_wrapper.go rename to internal/service/frostfs/pool_wrapper.go index fa70f15..b978d73 100644 --- a/internal/frostfs/services/pool_wrapper.go +++ b/internal/service/frostfs/pool_wrapper.go @@ -1,4 +1,4 @@ -package services +package frostfs import ( "context" diff --git a/tree/tree_test.go b/tree/tree_test.go index 69ac5f6..62f9914 100644 --- a/tree/tree_test.go +++ b/tree/tree_test.go @@ -21,11 +21,11 @@ func (m nodeMeta) GetValue() []byte { type nodeResponse struct { meta []nodeMeta - timestamp uint64 + timestamp []uint64 } func (n nodeResponse) GetTimestamp() []uint64 { - return []uint64{n.timestamp} + return n.timestamp } func (n nodeResponse) GetMeta() []Meta { @@ -59,7 +59,7 @@ func TestGetLatestNode(t *testing.T) { name: "one node of the object version", nodes: []NodeResponse{ nodeResponse{ - timestamp: 1, + timestamp: []uint64{1}, meta: []nodeMeta{ { key: oidKV, @@ -74,11 +74,11 @@ func TestGetLatestNode(t *testing.T) { name: "one node of the object version and one node of the secondary object", nodes: []NodeResponse{ nodeResponse{ - timestamp: 3, + timestamp: []uint64{3}, meta: []nodeMeta{}, }, nodeResponse{ - timestamp: 1, + timestamp: []uint64{1}, meta: []nodeMeta{ { key: oidKV, @@ -93,11 +93,11 @@ func TestGetLatestNode(t *testing.T) { name: "all nodes represent a secondary object", nodes: []NodeResponse{ nodeResponse{ - timestamp: 3, + timestamp: []uint64{3}, meta: []nodeMeta{}, }, nodeResponse{ - timestamp: 5, + timestamp: []uint64{5}, meta: []nodeMeta{}, }, }, @@ -107,7 +107,7 @@ func TestGetLatestNode(t *testing.T) { name: "several nodes of different types and with different timestamp", nodes: []NodeResponse{ nodeResponse{ - timestamp: 1, + timestamp: []uint64{1}, meta: []nodeMeta{ { key: oidKV, @@ -116,11 +116,11 @@ func TestGetLatestNode(t *testing.T) { }, }, nodeResponse{ - timestamp: 3, + timestamp: []uint64{3}, meta: []nodeMeta{}, }, nodeResponse{ - timestamp: 4, + timestamp: []uint64{4}, meta: []nodeMeta{ { key: oidKV, @@ -129,7 +129,7 @@ func TestGetLatestNode(t *testing.T) { }, }, nodeResponse{ - timestamp: 6, + timestamp: []uint64{6}, meta: []nodeMeta{}, }, },