From 02cf52dd5723f97b79bd2ea30d0531f1dc339c17 Mon Sep 17 00:00:00 2001 From: Nikita Zinkevich Date: Wed, 18 Sep 2024 07:35:26 +0300 Subject: [PATCH] [#142] Fix multipart-objects download Signed-off-by: Nikita Zinkevich --- cmd/http-gw/app.go | 5 +- internal/data/object.go | 41 +++ internal/handler/frostfs_mock.go | 4 + internal/handler/handler.go | 32 ++- internal/handler/head.go | 3 +- internal/handler/multipart.go | 50 ++++ internal/handler/reader.go | 99 ++++---- internal/handler/utils.go | 42 ++++ internal/{ => service}/frostfs/frostfs.go | 4 +- .../service/frostfs/multi_object_reader.go | 238 ++++++++++++++++++ .../frostfs/multi_object_reader_test.go | 137 ++++++++++ .../frostfs}/pool_wrapper.go | 14 +- tree/tree.go | 27 +- tree/tree_test.go | 24 +- 14 files changed, 635 insertions(+), 85 deletions(-) create mode 100644 internal/data/object.go rename internal/{ => service}/frostfs/frostfs.go (97%) create mode 100644 internal/service/frostfs/multi_object_reader.go create mode 100644 internal/service/frostfs/multi_object_reader_test.go rename internal/{frostfs/services => service/frostfs}/pool_wrapper.go (84%) diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go index 4c49ee4..d93ee5b 100644 --- a/cmd/http-gw/app.go +++ b/cmd/http-gw/app.go @@ -18,11 +18,10 @@ import ( v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" @@ -401,7 +400,7 @@ func (a *app) setHealthStatus() { } func (a *app) Serve() { - handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool))) + handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool))) // Configure router. a.configureRouter(handler) diff --git a/internal/data/object.go b/internal/data/object.go new file mode 100644 index 0000000..2575218 --- /dev/null +++ b/internal/data/object.go @@ -0,0 +1,41 @@ +package data + +import ( + "time" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type ( + ObjectInfo struct { + ID oid.ID + CID cid.ID + + Bucket string + Name string + Size uint64 + Headers map[string]string + } + + // PartInfo is upload information about part. + PartInfo struct { + Key string `json:"key"` + UploadID string `json:"uploadId"` + Number int `json:"number"` + OID oid.ID `json:"oid"` + Size uint64 `json:"size"` + ETag string `json:"etag"` + MD5 string `json:"md5"` + Created time.Time `json:"created"` + } +) + +// Address returns object address. +func (o *ObjectInfo) Address() oid.Address { + var addr oid.Address + addr.SetContainer(o.CID) + addr.SetObject(o.ID) + + return addr +} diff --git a/internal/handler/frostfs_mock.go b/internal/handler/frostfs_mock.go index 9f4378a..c99a588 100644 --- a/internal/handler/frostfs_mock.go +++ b/internal/handler/frostfs_mock.go @@ -229,6 +229,10 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (Res return &resObjectSearchMock{res: res}, nil } +func (t *TestFrostFS) InitMultiObjectReader(context.Context, PrmInitMultiObjectReader) (io.Reader, error) { + return nil, nil +} + func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool { for _, attr := range attributes { if attr.Key() == filter.Header() { diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 4de9d9a..e9d7545 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -61,7 +61,9 @@ type PrmObjectGet struct { PrmAuth // Address to read the object header from. - Address oid.Address + Address oid.Address + Container cid.ID + Object oid.ID } // PrmObjectRange groups parameters of FrostFS.RangeObject operation. @@ -74,6 +76,10 @@ type PrmObjectRange struct { // Offset-length range of the object payload to be read. PayloadRange [2]uint64 + + Container cid.ID + + Object oid.ID } // Object represents FrostFS object. @@ -117,6 +123,16 @@ type PrmObjectSearch struct { Filters object.SearchFilters } +type PrmInitMultiObjectReader struct { + // payload range + Off, Ln uint64 + + ObjInfo *data.ObjectInfo + BktInfo *data.BucketInfo + Log *zap.Logger + Bearer *bearer.Token +} + type ResObjectSearch interface { Read(buf []oid.ID) (int, error) Iterate(f func(oid.ID) bool) error @@ -138,6 +154,8 @@ type FrostFS interface { RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error) CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error) + InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error) + utils.EpochInfoFetcher } @@ -177,7 +195,7 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler { // byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. -func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) { var ( idCnr, _ = c.UserValue("cid").(string) idObj, _ = c.UserValue("oid").(string) @@ -203,12 +221,12 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ addr.SetContainer(bktInfo.CID) addr.SetObject(*objID) - f(ctx, *h.newRequest(c, log), addr) + f(ctx, *h.newRequest(c, log), addr, bktInfo) } // byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that // prepares request and object address to it. -func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) { var ( bucketname = req.UserValue("cid").(string) key = req.UserValue("oid").(string) @@ -250,11 +268,11 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, addr.SetContainer(bktInfo.CID) addr.SetObject(foundOid.OID) - f(ctx, *h.newRequest(req, log), addr) + f(ctx, *h.newRequest(req, log), addr, bktInfo) } // byAttribute is a wrapper similar to byAddress. -func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { +func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) { scid, _ := c.UserValue("cid").(string) key, _ := c.UserValue("attr_key").(string) val, _ := c.UserValue("attr_val").(string) @@ -311,7 +329,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re addrObj.SetContainer(bktInfo.CID) addrObj.SetObject(buf[0]) - f(ctx, *h.newRequest(c, log), addrObj) + f(ctx, *h.newRequest(c, log), addrObj, bktInfo) } // resolveContainer decode container id, if it's not a valid container id diff --git a/internal/handler/head.go b/internal/handler/head.go index f0a1e94..3f8cf92 100644 --- a/internal/handler/head.go +++ b/internal/handler/head.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -24,7 +25,7 @@ const ( hdrContainerID = "X-Container-Id" ) -func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) { +func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address, _ *data.BucketInfo) { var start = time.Now() btoken := bearerToken(ctx) diff --git a/internal/handler/multipart.go b/internal/handler/multipart.go index de9242f..d470e14 100644 --- a/internal/handler/multipart.go +++ b/internal/handler/multipart.go @@ -1,13 +1,21 @@ package handler import ( + "errors" "io" + "strconv" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "go.uber.org/zap" ) +const ( + frostFSSystemMetadataPrefix = "S3-" + attributeMultipartObjectSize = frostFSSystemMetadataPrefix + "Multipart-Object-Size" +) + // MultipartFile provides standard ReadCloser interface and also allows one to // get file name, it's used for multipart uploads. type MultipartFile interface { @@ -45,3 +53,45 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF return part, nil } } + +// getPayload returns initial payload if object is not multipart else composes new reader with parts data. +func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) { + sizeValue, ok := p.attrs[attributeMultipartObjectSize] + if !ok { + return p.obj.Payload, p.obj.Header.PayloadSize(), nil + } + cid, ok := p.obj.Header.ContainerID() + if !ok { + return nil, 0, errors.New("no container id set") + } + oid, ok := p.obj.Header.ID() + if !ok { + return nil, 0, errors.New("no object id set") + } + size, err := strconv.ParseUint(sizeValue, 10, 64) + if err != nil { + return nil, 0, err + } + ctx := p.req.RequestCtx + params := PrmInitMultiObjectReader{ + Off: 0, + Ln: 0, + ObjInfo: &data.ObjectInfo{ + ID: oid, + CID: cid, + Bucket: p.bktinfo.Name, + Name: p.attrs["FilePath"], + Size: size, + Headers: p.attrs, + }, + BktInfo: p.bktinfo, + Log: h.log, + Bearer: bearerToken(ctx), + } + payload, err := h.frostfs.InitMultiObjectReader(ctx, params) + if err != nil { + return nil, 0, err + } + + return io.NopCloser(payload), size, nil +} diff --git a/internal/handler/reader.go b/internal/handler/reader.go index 65d258b..3919eb6 100644 --- a/internal/handler/reader.go +++ b/internal/handler/reader.go @@ -5,10 +5,10 @@ import ( "context" "io" "net/http" - "path" "strconv" "time" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" @@ -47,19 +47,24 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str return http.DetectContentType(buf), buf, err // to not lose io.EOF } -func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) { +type getPayloadParams struct { + obj *Object + req request + bktinfo *data.BucketInfo + attrs map[string]string +} + +func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address, bktInfo *data.BucketInfo) { var ( - err error - dis = "inline" - start = time.Now() - filename string + shouldDownload = req.QueryArgs().GetBool("download") + start = time.Now() ) prm := PrmObjectGet{ PrmAuth: PrmAuth{ BearerToken: bearerToken(ctx), }, - Address: objectAddress, + Address: objAddress, } rObj, err := h.frostfs.GetObject(ctx, prm) @@ -70,51 +75,40 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi // we can't close reader in this function, so how to do it? - if req.Request.URI().QueryArgs().GetBool("download") { - dis = "attachment" + attrs := makeAttributesMap(rObj.Header.Attributes()) + req.setAttributes(attrs) + + req.setIDs(rObj.Header) + req.setDisposition(shouldDownload, attrs[object.AttributeFileName]) + + if err = req.setTimestamp(attrs[object.AttributeTimestamp]); err != nil { + req.log.Error(logs.CouldntParseCreationDate, + zap.String("val", attrs[object.AttributeTimestamp]), + zap.Error(err)) + response.Error(req.RequestCtx, "failed to convert timestamp: "+err.Error(), fasthttp.StatusInternalServerError) } - payloadSize := rObj.Header.PayloadSize() + payloadParams := getPayloadParams{ + obj: rObj, + req: req, + bktinfo: bktInfo, + attrs: attrs, + } + payload, payloadSize, err := h.getPayload(payloadParams) + if err != nil { + req.handleFrostFSErr(err, start) + return + } req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) - var contentType string - for _, attr := range rObj.Header.Attributes() { - key := attr.Key() - val := attr.Value() - if !isValidToken(key) || !isValidValue(val) { - continue - } - - key = utils.BackwardTransformIfSystem(key) - - req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) - switch key { - case object.AttributeFileName: - filename = val - case object.AttributeTimestamp: - value, err := strconv.ParseInt(val, 10, 64) - if err != nil { - req.log.Info(logs.CouldntParseCreationDate, - zap.String("key", key), - zap.String("val", val), - zap.Error(err)) - continue - } - req.Response.Header.Set(fasthttp.HeaderLastModified, - time.Unix(value, 0).UTC().Format(http.TimeFormat)) - case object.AttributeContentType: - contentType = val - } - } - - idsToResponse(&req.Response, &rObj.Header) + contentType := attrs[object.AttributeContentType] if len(contentType) == 0 { // determine the Content-Type from the payload head var payloadHead []byte contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) { - return rObj.Payload, nil + return payload, nil }) if err != nil && err != io.EOF { req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err)) @@ -126,16 +120,27 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi var headReader io.Reader = bytes.NewReader(payloadHead) if err != io.EOF { // otherwise, we've already read full payload - headReader = io.MultiReader(headReader, rObj.Payload) + headReader = io.MultiReader(headReader, payload) } // note: we could do with io.Reader, but SetBodyStream below closes body stream // if it implements io.Closer and that's useful for us. - rObj.Payload = readCloser{headReader, rObj.Payload} + payload = readCloser{headReader, payload} } req.SetContentType(contentType) - req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) - - req.Response.SetBodyStream(rObj.Payload, int(payloadSize)) + req.Response.SetBodyStream(payload, int(payloadSize)) +} + +func makeAttributesMap(attrs []object.Attribute) map[string]string { + attributes := make(map[string]string) + for _, attr := range attrs { + if !isValidToken(attr.Key()) || !isValidValue(attr.Value()) { + continue + } + key := utils.BackwardTransformIfSystem(attr.Key()) + + attributes[key] = attr.Value() + } + return attributes } diff --git a/internal/handler/utils.go b/internal/handler/utils.go index a5a53ed..e322cc5 100644 --- a/internal/handler/utils.go +++ b/internal/handler/utils.go @@ -2,14 +2,19 @@ package handler import ( "context" + "net/http" + "path" + "strconv" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "github.com/valyala/fasthttp" "go.uber.org/zap" ) @@ -31,6 +36,43 @@ func (r *request) handleFrostFSErr(err error, start time.Time) { response.Error(r.RequestCtx, msg, statusCode) } +func (r *request) setAttributes(attrs map[string]string) { + for key, val := range attrs { + r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) + } +} + +func (r *request) setIDs(obj object.Object) { + objID, _ := obj.ID() + cnrID, _ := obj.ContainerID() + r.Response.Header.Set(hdrObjectID, objID.String()) + r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String()) + r.Response.Header.Set(hdrContainerID, cnrID.String()) +} + +func (r *request) setDisposition(shouldDownload bool, filename string) { + const ( + inlineDisposition = "inline" + attachmentDisposition = "attachment" + ) + dis := inlineDisposition + if shouldDownload { + dis = attachmentDisposition + } + r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) +} + +func (r *request) setTimestamp(timestamp string) error { + value, err := strconv.ParseInt(timestamp, 10, 64) + if err != nil { + return err + } + r.Response.Header.Set(fasthttp.HeaderLastModified, + time.Unix(value, 0).UTC().Format(http.TimeFormat)) + + return nil +} + func bearerToken(ctx context.Context) *bearer.Token { if tkn, err := tokens.LoadBearerToken(ctx); err == nil { return tkn diff --git a/internal/frostfs/frostfs.go b/internal/service/frostfs/frostfs.go similarity index 97% rename from internal/frostfs/frostfs.go rename to internal/service/frostfs/frostfs.go index 6cf162a..c7e56a4 100644 --- a/internal/frostfs/frostfs.go +++ b/internal/service/frostfs/frostfs.go @@ -33,9 +33,9 @@ func NewFrostFS(p *pool.Pool) *FrostFS { } // Container implements frostfs.FrostFS interface method. -func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) { +func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContainer) (*container.Container, error) { prm := pool.PrmContainerGet{ - ContainerID: layerPrm.ContainerID, + ContainerID: containerPrm.ContainerID, } res, err := x.pool.GetContainer(ctx, prm) diff --git a/internal/service/frostfs/multi_object_reader.go b/internal/service/frostfs/multi_object_reader.go new file mode 100644 index 0000000..9aa064e --- /dev/null +++ b/internal/service/frostfs/multi_object_reader.go @@ -0,0 +1,238 @@ +package frostfs + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +type GetFrostFSParams struct { + // payload range + Off, Ln uint64 + + Oid oid.ID + BktInfo *data.BucketInfo +} + +type PartObj struct { + OID oid.ID + Size uint64 +} + +type readerInitiator interface { + InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error) +} + +// MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network. +type MultiObjectReader struct { + ctx context.Context + log *zap.Logger + + layer readerInitiator + + startPartOffset uint64 + endPartLength uint64 + + prm GetFrostFSParams + + curIndex int + curReader io.Reader + + parts []PartObj +} + +type MultiObjectReaderConfig struct { + Initiator readerInitiator + Log *zap.Logger + + // the offset of complete object and total size to read + Off, Ln uint64 + + BktInfo *data.BucketInfo + Parts []PartObj +} + +var ( + errOffsetIsOutOfRange = errors.New("offset is out of payload range") + errLengthIsOutOfRange = errors.New("length is out of payload range") + errEmptyPartsList = errors.New("empty parts list") + errorZeroRangeLength = errors.New("zero range length") +) + +func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) { + combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{ + PrmAuth: handler.PrmAuth{BearerToken: p.Bearer}, + Address: p.ObjInfo.Address(), + }) + if err != nil { + return nil, fmt.Errorf("get combined object '%s': %w", p.ObjInfo.ID.EncodeToString(), err) + } + + var parts []*data.PartInfo + if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil { + return nil, fmt.Errorf("unmarshal combined object parts: %w", err) + } + + objParts := make([]PartObj, len(parts)) + for i, part := range parts { + objParts[i] = PartObj{ + OID: part.OID, + Size: part.Size, + } + } + + return NewMultiObjectReader(ctx, MultiObjectReaderConfig{ + Initiator: x, + Off: p.Off, + Ln: p.Ln, + Parts: objParts, + BktInfo: p.BktInfo, + Log: p.Log, + }) +} + +func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) { + if len(cfg.Parts) == 0 { + return nil, errEmptyPartsList + } + + r := &MultiObjectReader{ + ctx: ctx, + layer: cfg.Initiator, + prm: GetFrostFSParams{ + BktInfo: cfg.BktInfo, + }, + parts: cfg.Parts, + log: cfg.Log, + } + + if cfg.Off+cfg.Ln == 0 { + return r, nil + } + + if cfg.Off > 0 && cfg.Ln == 0 { + return nil, errorZeroRangeLength + } + + startPartIndex, startPartOffset := findStartPart(cfg) + if startPartIndex == -1 { + return nil, errOffsetIsOutOfRange + } + r.startPartOffset = startPartOffset + + endPartIndex, endPartLength := findEndPart(cfg) + if endPartIndex == -1 { + return nil, errLengthIsOutOfRange + } + r.endPartLength = endPartLength + + r.parts = cfg.Parts[startPartIndex : endPartIndex+1] + + return r, nil +} + +func findStartPart(cfg MultiObjectReaderConfig) (index int, offset uint64) { + position := cfg.Off + for i, part := range cfg.Parts { + // Strict inequality when searching for start position to avoid reading zero length part. + if position < part.Size { + return i, position + } + position -= part.Size + } + + return -1, 0 +} + +func findEndPart(cfg MultiObjectReaderConfig) (index int, length uint64) { + position := cfg.Off + cfg.Ln + for i, part := range cfg.Parts { + // Non-strict inequality when searching for end position to avoid out of payload range error. + if position <= part.Size { + return i, position + } + position -= part.Size + } + + return -1, 0 +} + +func (x *MultiObjectReader) Read(p []byte) (n int, err error) { + if x.curReader != nil { + n, err = x.curReader.Read(p) + if !errors.Is(err, io.EOF) { + return n, err + } + x.curIndex++ + } + + if x.curIndex == len(x.parts) { + return n, io.EOF + } + + x.prm.Oid = x.parts[x.curIndex].OID + + if x.curIndex == 0 { + x.prm.Off = x.startPartOffset + x.prm.Ln = x.parts[x.curIndex].Size - x.startPartOffset + } + + if x.curIndex == len(x.parts)-1 { + x.prm.Ln = x.endPartLength - x.prm.Off + } + + x.curReader, err = x.layer.InitFrostFSObjectPayloadReader(x.ctx, x.prm) + if err != nil { + return n, fmt.Errorf("init payload reader for the next part: %w", err) + } + + x.prm.Off = 0 + x.prm.Ln = 0 + + next, err := x.Read(p[n:]) + + return n + next, err +} + +// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object. +// Zero range corresponds to full payload (panics if only offset is set). +func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error) { + var prmAuth handler.PrmAuth + + var addr oid.Address + addr.SetContainer(p.BktInfo.CID) + addr.SetObject(p.Oid) + + if p.Off+p.Ln != 0 { + prm := handler.PrmObjectRange{ + PrmAuth: prmAuth, + Container: p.BktInfo.CID, + Object: p.Oid, + PayloadRange: [2]uint64{p.Off, p.Ln}, + Address: addr, + } + + return x.RangeObject(ctx, prm) + } + + prm := handler.PrmObjectGet{ + PrmAuth: prmAuth, + Container: p.BktInfo.CID, + Object: p.Oid, + Address: addr, + } + + res, err := x.GetObject(ctx, prm) + if err != nil { + return nil, err + } + + return res.Payload, nil +} diff --git a/internal/service/frostfs/multi_object_reader_test.go b/internal/service/frostfs/multi_object_reader_test.go new file mode 100644 index 0000000..6b333c7 --- /dev/null +++ b/internal/service/frostfs/multi_object_reader_test.go @@ -0,0 +1,137 @@ +package frostfs + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "testing" + + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +type readerInitiatorMock struct { + parts map[oid.ID][]byte +} + +func (r *readerInitiatorMock) InitFrostFSObjectPayloadReader(_ context.Context, p GetFrostFSParams) (io.Reader, error) { + partPayload, ok := r.parts[p.Oid] + if !ok { + return nil, errors.New("part not found") + } + + if p.Off+p.Ln == 0 { + return bytes.NewReader(partPayload), nil + } + + if p.Off > uint64(len(partPayload)-1) { + return nil, fmt.Errorf("invalid offset: %d/%d", p.Off, len(partPayload)) + } + + if p.Off+p.Ln > uint64(len(partPayload)) { + return nil, fmt.Errorf("invalid range: %d-%d/%d", p.Off, p.Off+p.Ln, len(partPayload)) + } + + return bytes.NewReader(partPayload[p.Off : p.Off+p.Ln]), nil +} + +func prepareDataReader() ([]byte, []PartObj, *readerInitiatorMock) { + mockInitReader := &readerInitiatorMock{ + parts: map[oid.ID][]byte{ + oidtest.ID(): []byte("first part 1"), + oidtest.ID(): []byte("second part 2"), + oidtest.ID(): []byte("third part 3"), + }, + } + + var fullPayload []byte + parts := make([]PartObj, 0, len(mockInitReader.parts)) + for id, payload := range mockInitReader.parts { + parts = append(parts, PartObj{OID: id, Size: uint64(len(payload))}) + fullPayload = append(fullPayload, payload...) + } + + return fullPayload, parts, mockInitReader +} + +func TestMultiReader(t *testing.T) { + ctx := context.Background() + + fullPayload, parts, mockInitReader := prepareDataReader() + + for _, tc := range []struct { + name string + off uint64 + ln uint64 + err error + }{ + { + name: "simple read all", + }, + { + name: "simple read with length", + ln: uint64(len(fullPayload)), + }, + { + name: "middle of parts", + off: parts[0].Size + 2, + ln: 4, + }, + { + name: "first and second", + off: parts[0].Size - 4, + ln: 8, + }, + { + name: "first and third", + off: parts[0].Size - 4, + ln: parts[1].Size + 8, + }, + { + name: "second part", + off: parts[0].Size, + ln: parts[1].Size, + }, + { + name: "second and third", + off: parts[0].Size, + ln: parts[1].Size + parts[2].Size, + }, + { + name: "offset out of range", + off: uint64(len(fullPayload) + 1), + ln: 1, + err: errOffsetIsOutOfRange, + }, + { + name: "zero length", + off: parts[1].Size + 1, + ln: 0, + err: errorZeroRangeLength, + }, + } { + t.Run(tc.name, func(t *testing.T) { + multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{ + Initiator: mockInitReader, + Parts: parts, + Off: tc.off, + Ln: tc.ln, + }) + require.ErrorIs(t, err, tc.err) + + if tc.err == nil { + off := tc.off + ln := tc.ln + if off+ln == 0 { + ln = uint64(len(fullPayload)) + } + data, err := io.ReadAll(multiReader) + require.NoError(t, err) + require.Equal(t, fullPayload[off:off+ln], data) + } + }) + } +} diff --git a/internal/frostfs/services/pool_wrapper.go b/internal/service/frostfs/pool_wrapper.go similarity index 84% rename from internal/frostfs/services/pool_wrapper.go rename to internal/service/frostfs/pool_wrapper.go index f7b0a26..ba250db 100644 --- a/internal/frostfs/services/pool_wrapper.go +++ b/internal/service/frostfs/pool_wrapper.go @@ -1,4 +1,4 @@ -package services +package frostfs import ( "context" @@ -15,16 +15,16 @@ type GetNodeByPathResponseInfoWrapper struct { response *grpcService.GetNodeByPathResponse_Info } -func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 { - return n.response.GetNodeId() +func (n GetNodeByPathResponseInfoWrapper) GetNodeID() []uint64 { + return []uint64{n.response.GetNodeId()} } -func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 { - return n.response.GetParentId() +func (n GetNodeByPathResponseInfoWrapper) GetParentID() []uint64 { + return []uint64{n.response.GetParentId()} } -func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 { - return n.response.GetTimestamp() +func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() []uint64 { + return []uint64{n.response.GetTimestamp()} } func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta { diff --git a/tree/tree.go b/tree/tree.go index a9135eb..bfaef09 100644 --- a/tree/tree.go +++ b/tree/tree.go @@ -73,7 +73,7 @@ type Meta interface { type NodeResponse interface { GetMeta() []Meta - GetTimestamp() uint64 + GetTimestamp() []uint64 } func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { @@ -144,7 +144,7 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s return nil, err } - latestNode, err := getLatestNode(nodes) + latestNode, err := getLatestVersionNode(nodes) if err != nil { return nil, err } @@ -152,17 +152,20 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s return newNodeVersion(latestNode) } -func getLatestNode(nodes []NodeResponse) (NodeResponse, error) { +func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) { var ( maxCreationTime uint64 targetIndexNode = -1 ) for i, node := range nodes { - currentCreationTime := node.GetTimestamp() - if checkExistOID(node.GetMeta()) && currentCreationTime > maxCreationTime { - maxCreationTime = currentCreationTime + if !checkExistOID(node.GetMeta()) { + continue + } + + if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime { targetIndexNode = i + maxCreationTime = currentCreationTime } } @@ -187,3 +190,15 @@ func checkExistOID(meta []Meta) bool { func pathFromName(objectName string) []string { return strings.Split(objectName, separator) } + +func getMaxTimestamp(node NodeResponse) uint64 { + var maxTimestamp uint64 + + for _, timestamp := range node.GetTimestamp() { + if timestamp > maxTimestamp { + maxTimestamp = timestamp + } + } + + return maxTimestamp +} diff --git a/tree/tree_test.go b/tree/tree_test.go index 7cd2314..9522709 100644 --- a/tree/tree_test.go +++ b/tree/tree_test.go @@ -21,10 +21,10 @@ func (m nodeMeta) GetValue() []byte { type nodeResponse struct { meta []nodeMeta - timestamp uint64 + timestamp []uint64 } -func (n nodeResponse) GetTimestamp() uint64 { +func (n nodeResponse) GetTimestamp() []uint64 { return n.timestamp } @@ -52,7 +52,7 @@ func TestGetLatestNode(t *testing.T) { name: "one node of the object version", nodes: []NodeResponse{ nodeResponse{ - timestamp: 1, + timestamp: []uint64{1}, meta: []nodeMeta{ { key: oidKV, @@ -67,11 +67,11 @@ func TestGetLatestNode(t *testing.T) { name: "one node of the object version and one node of the secondary object", nodes: []NodeResponse{ nodeResponse{ - timestamp: 3, + timestamp: []uint64{3}, meta: []nodeMeta{}, }, nodeResponse{ - timestamp: 1, + timestamp: []uint64{1}, meta: []nodeMeta{ { key: oidKV, @@ -86,11 +86,11 @@ func TestGetLatestNode(t *testing.T) { name: "all nodes represent a secondary object", nodes: []NodeResponse{ nodeResponse{ - timestamp: 3, + timestamp: []uint64{3}, meta: []nodeMeta{}, }, nodeResponse{ - timestamp: 5, + timestamp: []uint64{5}, meta: []nodeMeta{}, }, }, @@ -100,7 +100,7 @@ func TestGetLatestNode(t *testing.T) { name: "several nodes of different types and with different timestamp", nodes: []NodeResponse{ nodeResponse{ - timestamp: 1, + timestamp: []uint64{1}, meta: []nodeMeta{ { key: oidKV, @@ -109,11 +109,11 @@ func TestGetLatestNode(t *testing.T) { }, }, nodeResponse{ - timestamp: 3, + timestamp: []uint64{3}, meta: []nodeMeta{}, }, nodeResponse{ - timestamp: 4, + timestamp: []uint64{4}, meta: []nodeMeta{ { key: oidKV, @@ -122,7 +122,7 @@ func TestGetLatestNode(t *testing.T) { }, }, nodeResponse{ - timestamp: 6, + timestamp: []uint64{6}, meta: []nodeMeta{}, }, }, @@ -130,7 +130,7 @@ func TestGetLatestNode(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - actualNode, err := getLatestNode(tc.nodes) + actualNode, err := getLatestVersionNode(tc.nodes) if tc.error { require.Error(t, err) return