[#142] Fix multipart-objects download
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
77eb474581
commit
02cf52dd57
14 changed files with 635 additions and 85 deletions
|
@ -18,11 +18,10 @@ import (
|
||||||
|
|
||||||
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
|
@ -401,7 +400,7 @@ func (a *app) setHealthStatus() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) Serve() {
|
func (a *app) Serve() {
|
||||||
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool)))
|
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)))
|
||||||
|
|
||||||
// Configure router.
|
// Configure router.
|
||||||
a.configureRouter(handler)
|
a.configureRouter(handler)
|
||||||
|
|
41
internal/data/object.go
Normal file
41
internal/data/object.go
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
package data
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
ObjectInfo struct {
|
||||||
|
ID oid.ID
|
||||||
|
CID cid.ID
|
||||||
|
|
||||||
|
Bucket string
|
||||||
|
Name string
|
||||||
|
Size uint64
|
||||||
|
Headers map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// PartInfo is upload information about part.
|
||||||
|
PartInfo struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
UploadID string `json:"uploadId"`
|
||||||
|
Number int `json:"number"`
|
||||||
|
OID oid.ID `json:"oid"`
|
||||||
|
Size uint64 `json:"size"`
|
||||||
|
ETag string `json:"etag"`
|
||||||
|
MD5 string `json:"md5"`
|
||||||
|
Created time.Time `json:"created"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// Address returns object address.
|
||||||
|
func (o *ObjectInfo) Address() oid.Address {
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(o.CID)
|
||||||
|
addr.SetObject(o.ID)
|
||||||
|
|
||||||
|
return addr
|
||||||
|
}
|
|
@ -229,6 +229,10 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (Res
|
||||||
return &resObjectSearchMock{res: res}, nil
|
return &resObjectSearchMock{res: res}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) InitMultiObjectReader(context.Context, PrmInitMultiObjectReader) (io.Reader, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
||||||
for _, attr := range attributes {
|
for _, attr := range attributes {
|
||||||
if attr.Key() == filter.Header() {
|
if attr.Key() == filter.Header() {
|
||||||
|
|
|
@ -61,7 +61,9 @@ type PrmObjectGet struct {
|
||||||
PrmAuth
|
PrmAuth
|
||||||
|
|
||||||
// Address to read the object header from.
|
// Address to read the object header from.
|
||||||
Address oid.Address
|
Address oid.Address
|
||||||
|
Container cid.ID
|
||||||
|
Object oid.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
|
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
|
||||||
|
@ -74,6 +76,10 @@ type PrmObjectRange struct {
|
||||||
|
|
||||||
// Offset-length range of the object payload to be read.
|
// Offset-length range of the object payload to be read.
|
||||||
PayloadRange [2]uint64
|
PayloadRange [2]uint64
|
||||||
|
|
||||||
|
Container cid.ID
|
||||||
|
|
||||||
|
Object oid.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object represents FrostFS object.
|
// Object represents FrostFS object.
|
||||||
|
@ -117,6 +123,16 @@ type PrmObjectSearch struct {
|
||||||
Filters object.SearchFilters
|
Filters object.SearchFilters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PrmInitMultiObjectReader struct {
|
||||||
|
// payload range
|
||||||
|
Off, Ln uint64
|
||||||
|
|
||||||
|
ObjInfo *data.ObjectInfo
|
||||||
|
BktInfo *data.BucketInfo
|
||||||
|
Log *zap.Logger
|
||||||
|
Bearer *bearer.Token
|
||||||
|
}
|
||||||
|
|
||||||
type ResObjectSearch interface {
|
type ResObjectSearch interface {
|
||||||
Read(buf []oid.ID) (int, error)
|
Read(buf []oid.ID) (int, error)
|
||||||
Iterate(f func(oid.ID) bool) error
|
Iterate(f func(oid.ID) bool) error
|
||||||
|
@ -138,6 +154,8 @@ type FrostFS interface {
|
||||||
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
||||||
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
||||||
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
||||||
|
InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error)
|
||||||
|
|
||||||
utils.EpochInfoFetcher
|
utils.EpochInfoFetcher
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,7 +195,7 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
|
||||||
|
|
||||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// prepares request and object address to it.
|
// prepares request and object address to it.
|
||||||
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||||
var (
|
var (
|
||||||
idCnr, _ = c.UserValue("cid").(string)
|
idCnr, _ = c.UserValue("cid").(string)
|
||||||
idObj, _ = c.UserValue("oid").(string)
|
idObj, _ = c.UserValue("oid").(string)
|
||||||
|
@ -203,12 +221,12 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(*objID)
|
addr.SetObject(*objID)
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addr)
|
f(ctx, *h.newRequest(c, log), addr, bktInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// prepares request and object address to it.
|
// prepares request and object address to it.
|
||||||
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||||
var (
|
var (
|
||||||
bucketname = req.UserValue("cid").(string)
|
bucketname = req.UserValue("cid").(string)
|
||||||
key = req.UserValue("oid").(string)
|
key = req.UserValue("oid").(string)
|
||||||
|
@ -250,11 +268,11 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context,
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(foundOid.OID)
|
addr.SetObject(foundOid.OID)
|
||||||
|
|
||||||
f(ctx, *h.newRequest(req, log), addr)
|
f(ctx, *h.newRequest(req, log), addr, bktInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byAttribute is a wrapper similar to byAddress.
|
// byAttribute is a wrapper similar to byAddress.
|
||||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||||
scid, _ := c.UserValue("cid").(string)
|
scid, _ := c.UserValue("cid").(string)
|
||||||
key, _ := c.UserValue("attr_key").(string)
|
key, _ := c.UserValue("attr_key").(string)
|
||||||
val, _ := c.UserValue("attr_val").(string)
|
val, _ := c.UserValue("attr_val").(string)
|
||||||
|
@ -311,7 +329,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
||||||
addrObj.SetContainer(bktInfo.CID)
|
addrObj.SetContainer(bktInfo.CID)
|
||||||
addrObj.SetObject(buf[0])
|
addrObj.SetObject(buf[0])
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addrObj)
|
f(ctx, *h.newRequest(c, log), addrObj, bktInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// resolveContainer decode container id, if it's not a valid container id
|
// resolveContainer decode container id, if it's not a valid container id
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -24,7 +25,7 @@ const (
|
||||||
hdrContainerID = "X-Container-Id"
|
hdrContainerID = "X-Container-Id"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) {
|
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address, _ *data.BucketInfo) {
|
||||||
var start = time.Now()
|
var start = time.Now()
|
||||||
|
|
||||||
btoken := bearerToken(ctx)
|
btoken := bearerToken(ctx)
|
||||||
|
|
|
@ -1,13 +1,21 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
frostFSSystemMetadataPrefix = "S3-"
|
||||||
|
attributeMultipartObjectSize = frostFSSystemMetadataPrefix + "Multipart-Object-Size"
|
||||||
|
)
|
||||||
|
|
||||||
// MultipartFile provides standard ReadCloser interface and also allows one to
|
// MultipartFile provides standard ReadCloser interface and also allows one to
|
||||||
// get file name, it's used for multipart uploads.
|
// get file name, it's used for multipart uploads.
|
||||||
type MultipartFile interface {
|
type MultipartFile interface {
|
||||||
|
@ -45,3 +53,45 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
||||||
return part, nil
|
return part, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||||
|
func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) {
|
||||||
|
sizeValue, ok := p.attrs[attributeMultipartObjectSize]
|
||||||
|
if !ok {
|
||||||
|
return p.obj.Payload, p.obj.Header.PayloadSize(), nil
|
||||||
|
}
|
||||||
|
cid, ok := p.obj.Header.ContainerID()
|
||||||
|
if !ok {
|
||||||
|
return nil, 0, errors.New("no container id set")
|
||||||
|
}
|
||||||
|
oid, ok := p.obj.Header.ID()
|
||||||
|
if !ok {
|
||||||
|
return nil, 0, errors.New("no object id set")
|
||||||
|
}
|
||||||
|
size, err := strconv.ParseUint(sizeValue, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
ctx := p.req.RequestCtx
|
||||||
|
params := PrmInitMultiObjectReader{
|
||||||
|
Off: 0,
|
||||||
|
Ln: 0,
|
||||||
|
ObjInfo: &data.ObjectInfo{
|
||||||
|
ID: oid,
|
||||||
|
CID: cid,
|
||||||
|
Bucket: p.bktinfo.Name,
|
||||||
|
Name: p.attrs["FilePath"],
|
||||||
|
Size: size,
|
||||||
|
Headers: p.attrs,
|
||||||
|
},
|
||||||
|
BktInfo: p.bktinfo,
|
||||||
|
Log: h.log,
|
||||||
|
Bearer: bearerToken(ctx),
|
||||||
|
}
|
||||||
|
payload, err := h.frostfs.InitMultiObjectReader(ctx, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.NopCloser(payload), size, nil
|
||||||
|
}
|
||||||
|
|
|
@ -5,10 +5,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
@ -47,19 +47,24 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
||||||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) {
|
type getPayloadParams struct {
|
||||||
|
obj *Object
|
||||||
|
req request
|
||||||
|
bktinfo *data.BucketInfo
|
||||||
|
attrs map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address, bktInfo *data.BucketInfo) {
|
||||||
var (
|
var (
|
||||||
err error
|
shouldDownload = req.QueryArgs().GetBool("download")
|
||||||
dis = "inline"
|
start = time.Now()
|
||||||
start = time.Now()
|
|
||||||
filename string
|
|
||||||
)
|
)
|
||||||
|
|
||||||
prm := PrmObjectGet{
|
prm := PrmObjectGet{
|
||||||
PrmAuth: PrmAuth{
|
PrmAuth: PrmAuth{
|
||||||
BearerToken: bearerToken(ctx),
|
BearerToken: bearerToken(ctx),
|
||||||
},
|
},
|
||||||
Address: objectAddress,
|
Address: objAddress,
|
||||||
}
|
}
|
||||||
|
|
||||||
rObj, err := h.frostfs.GetObject(ctx, prm)
|
rObj, err := h.frostfs.GetObject(ctx, prm)
|
||||||
|
@ -70,51 +75,40 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
||||||
|
|
||||||
// we can't close reader in this function, so how to do it?
|
// we can't close reader in this function, so how to do it?
|
||||||
|
|
||||||
if req.Request.URI().QueryArgs().GetBool("download") {
|
attrs := makeAttributesMap(rObj.Header.Attributes())
|
||||||
dis = "attachment"
|
req.setAttributes(attrs)
|
||||||
|
|
||||||
|
req.setIDs(rObj.Header)
|
||||||
|
req.setDisposition(shouldDownload, attrs[object.AttributeFileName])
|
||||||
|
|
||||||
|
if err = req.setTimestamp(attrs[object.AttributeTimestamp]); err != nil {
|
||||||
|
req.log.Error(logs.CouldntParseCreationDate,
|
||||||
|
zap.String("val", attrs[object.AttributeTimestamp]),
|
||||||
|
zap.Error(err))
|
||||||
|
response.Error(req.RequestCtx, "failed to convert timestamp: "+err.Error(), fasthttp.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
payloadSize := rObj.Header.PayloadSize()
|
payloadParams := getPayloadParams{
|
||||||
|
obj: rObj,
|
||||||
|
req: req,
|
||||||
|
bktinfo: bktInfo,
|
||||||
|
attrs: attrs,
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, payloadSize, err := h.getPayload(payloadParams)
|
||||||
|
if err != nil {
|
||||||
|
req.handleFrostFSErr(err, start)
|
||||||
|
return
|
||||||
|
}
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||||
var contentType string
|
|
||||||
for _, attr := range rObj.Header.Attributes() {
|
|
||||||
key := attr.Key()
|
|
||||||
val := attr.Value()
|
|
||||||
if !isValidToken(key) || !isValidValue(val) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
key = utils.BackwardTransformIfSystem(key)
|
|
||||||
|
|
||||||
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
|
||||||
switch key {
|
|
||||||
case object.AttributeFileName:
|
|
||||||
filename = val
|
|
||||||
case object.AttributeTimestamp:
|
|
||||||
value, err := strconv.ParseInt(val, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
req.log.Info(logs.CouldntParseCreationDate,
|
|
||||||
zap.String("key", key),
|
|
||||||
zap.String("val", val),
|
|
||||||
zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
|
||||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
|
||||||
case object.AttributeContentType:
|
|
||||||
contentType = val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
idsToResponse(&req.Response, &rObj.Header)
|
|
||||||
|
|
||||||
|
contentType := attrs[object.AttributeContentType]
|
||||||
if len(contentType) == 0 {
|
if len(contentType) == 0 {
|
||||||
// determine the Content-Type from the payload head
|
// determine the Content-Type from the payload head
|
||||||
var payloadHead []byte
|
var payloadHead []byte
|
||||||
|
|
||||||
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
||||||
return rObj.Payload, nil
|
return payload, nil
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
||||||
|
@ -126,16 +120,27 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
||||||
var headReader io.Reader = bytes.NewReader(payloadHead)
|
var headReader io.Reader = bytes.NewReader(payloadHead)
|
||||||
|
|
||||||
if err != io.EOF { // otherwise, we've already read full payload
|
if err != io.EOF { // otherwise, we've already read full payload
|
||||||
headReader = io.MultiReader(headReader, rObj.Payload)
|
headReader = io.MultiReader(headReader, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
||||||
// if it implements io.Closer and that's useful for us.
|
// if it implements io.Closer and that's useful for us.
|
||||||
rObj.Payload = readCloser{headReader, rObj.Payload}
|
payload = readCloser{headReader, payload}
|
||||||
}
|
}
|
||||||
req.SetContentType(contentType)
|
req.SetContentType(contentType)
|
||||||
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
req.Response.SetBodyStream(payload, int(payloadSize))
|
||||||
|
}
|
||||||
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
|
||||||
|
func makeAttributesMap(attrs []object.Attribute) map[string]string {
|
||||||
|
attributes := make(map[string]string)
|
||||||
|
for _, attr := range attrs {
|
||||||
|
if !isValidToken(attr.Key()) || !isValidValue(attr.Value()) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
key := utils.BackwardTransformIfSystem(attr.Key())
|
||||||
|
|
||||||
|
attributes[key] = attr.Value()
|
||||||
|
}
|
||||||
|
return attributes
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,14 +2,19 @@ package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -31,6 +36,43 @@ func (r *request) handleFrostFSErr(err error, start time.Time) {
|
||||||
response.Error(r.RequestCtx, msg, statusCode)
|
response.Error(r.RequestCtx, msg, statusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *request) setAttributes(attrs map[string]string) {
|
||||||
|
for key, val := range attrs {
|
||||||
|
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) setIDs(obj object.Object) {
|
||||||
|
objID, _ := obj.ID()
|
||||||
|
cnrID, _ := obj.ContainerID()
|
||||||
|
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||||
|
r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String())
|
||||||
|
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) setDisposition(shouldDownload bool, filename string) {
|
||||||
|
const (
|
||||||
|
inlineDisposition = "inline"
|
||||||
|
attachmentDisposition = "attachment"
|
||||||
|
)
|
||||||
|
dis := inlineDisposition
|
||||||
|
if shouldDownload {
|
||||||
|
dis = attachmentDisposition
|
||||||
|
}
|
||||||
|
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) setTimestamp(timestamp string) error {
|
||||||
|
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||||
|
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func bearerToken(ctx context.Context) *bearer.Token {
|
func bearerToken(ctx context.Context) *bearer.Token {
|
||||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
||||||
return tkn
|
return tkn
|
||||||
|
|
|
@ -33,9 +33,9 @@ func NewFrostFS(p *pool.Pool) *FrostFS {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Container implements frostfs.FrostFS interface method.
|
// Container implements frostfs.FrostFS interface method.
|
||||||
func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) {
|
func (x *FrostFS) Container(ctx context.Context, containerPrm handler.PrmContainer) (*container.Container, error) {
|
||||||
prm := pool.PrmContainerGet{
|
prm := pool.PrmContainerGet{
|
||||||
ContainerID: layerPrm.ContainerID,
|
ContainerID: containerPrm.ContainerID,
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := x.pool.GetContainer(ctx, prm)
|
res, err := x.pool.GetContainer(ctx, prm)
|
238
internal/service/frostfs/multi_object_reader.go
Normal file
238
internal/service/frostfs/multi_object_reader.go
Normal file
|
@ -0,0 +1,238 @@
|
||||||
|
package frostfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetFrostFSParams struct {
|
||||||
|
// payload range
|
||||||
|
Off, Ln uint64
|
||||||
|
|
||||||
|
Oid oid.ID
|
||||||
|
BktInfo *data.BucketInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
type PartObj struct {
|
||||||
|
OID oid.ID
|
||||||
|
Size uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type readerInitiator interface {
|
||||||
|
InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network.
|
||||||
|
type MultiObjectReader struct {
|
||||||
|
ctx context.Context
|
||||||
|
log *zap.Logger
|
||||||
|
|
||||||
|
layer readerInitiator
|
||||||
|
|
||||||
|
startPartOffset uint64
|
||||||
|
endPartLength uint64
|
||||||
|
|
||||||
|
prm GetFrostFSParams
|
||||||
|
|
||||||
|
curIndex int
|
||||||
|
curReader io.Reader
|
||||||
|
|
||||||
|
parts []PartObj
|
||||||
|
}
|
||||||
|
|
||||||
|
type MultiObjectReaderConfig struct {
|
||||||
|
Initiator readerInitiator
|
||||||
|
Log *zap.Logger
|
||||||
|
|
||||||
|
// the offset of complete object and total size to read
|
||||||
|
Off, Ln uint64
|
||||||
|
|
||||||
|
BktInfo *data.BucketInfo
|
||||||
|
Parts []PartObj
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errOffsetIsOutOfRange = errors.New("offset is out of payload range")
|
||||||
|
errLengthIsOutOfRange = errors.New("length is out of payload range")
|
||||||
|
errEmptyPartsList = errors.New("empty parts list")
|
||||||
|
errorZeroRangeLength = errors.New("zero range length")
|
||||||
|
)
|
||||||
|
|
||||||
|
func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) {
|
||||||
|
combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{
|
||||||
|
PrmAuth: handler.PrmAuth{BearerToken: p.Bearer},
|
||||||
|
Address: p.ObjInfo.Address(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get combined object '%s': %w", p.ObjInfo.ID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parts []*data.PartInfo
|
||||||
|
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objParts := make([]PartObj, len(parts))
|
||||||
|
for i, part := range parts {
|
||||||
|
objParts[i] = PartObj{
|
||||||
|
OID: part.OID,
|
||||||
|
Size: part.Size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
||||||
|
Initiator: x,
|
||||||
|
Off: p.Off,
|
||||||
|
Ln: p.Ln,
|
||||||
|
Parts: objParts,
|
||||||
|
BktInfo: p.BktInfo,
|
||||||
|
Log: p.Log,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) {
|
||||||
|
if len(cfg.Parts) == 0 {
|
||||||
|
return nil, errEmptyPartsList
|
||||||
|
}
|
||||||
|
|
||||||
|
r := &MultiObjectReader{
|
||||||
|
ctx: ctx,
|
||||||
|
layer: cfg.Initiator,
|
||||||
|
prm: GetFrostFSParams{
|
||||||
|
BktInfo: cfg.BktInfo,
|
||||||
|
},
|
||||||
|
parts: cfg.Parts,
|
||||||
|
log: cfg.Log,
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.Off+cfg.Ln == 0 {
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.Off > 0 && cfg.Ln == 0 {
|
||||||
|
return nil, errorZeroRangeLength
|
||||||
|
}
|
||||||
|
|
||||||
|
startPartIndex, startPartOffset := findStartPart(cfg)
|
||||||
|
if startPartIndex == -1 {
|
||||||
|
return nil, errOffsetIsOutOfRange
|
||||||
|
}
|
||||||
|
r.startPartOffset = startPartOffset
|
||||||
|
|
||||||
|
endPartIndex, endPartLength := findEndPart(cfg)
|
||||||
|
if endPartIndex == -1 {
|
||||||
|
return nil, errLengthIsOutOfRange
|
||||||
|
}
|
||||||
|
r.endPartLength = endPartLength
|
||||||
|
|
||||||
|
r.parts = cfg.Parts[startPartIndex : endPartIndex+1]
|
||||||
|
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func findStartPart(cfg MultiObjectReaderConfig) (index int, offset uint64) {
|
||||||
|
position := cfg.Off
|
||||||
|
for i, part := range cfg.Parts {
|
||||||
|
// Strict inequality when searching for start position to avoid reading zero length part.
|
||||||
|
if position < part.Size {
|
||||||
|
return i, position
|
||||||
|
}
|
||||||
|
position -= part.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func findEndPart(cfg MultiObjectReaderConfig) (index int, length uint64) {
|
||||||
|
position := cfg.Off + cfg.Ln
|
||||||
|
for i, part := range cfg.Parts {
|
||||||
|
// Non-strict inequality when searching for end position to avoid out of payload range error.
|
||||||
|
if position <= part.Size {
|
||||||
|
return i, position
|
||||||
|
}
|
||||||
|
position -= part.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *MultiObjectReader) Read(p []byte) (n int, err error) {
|
||||||
|
if x.curReader != nil {
|
||||||
|
n, err = x.curReader.Read(p)
|
||||||
|
if !errors.Is(err, io.EOF) {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
x.curIndex++
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.curIndex == len(x.parts) {
|
||||||
|
return n, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
x.prm.Oid = x.parts[x.curIndex].OID
|
||||||
|
|
||||||
|
if x.curIndex == 0 {
|
||||||
|
x.prm.Off = x.startPartOffset
|
||||||
|
x.prm.Ln = x.parts[x.curIndex].Size - x.startPartOffset
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.curIndex == len(x.parts)-1 {
|
||||||
|
x.prm.Ln = x.endPartLength - x.prm.Off
|
||||||
|
}
|
||||||
|
|
||||||
|
x.curReader, err = x.layer.InitFrostFSObjectPayloadReader(x.ctx, x.prm)
|
||||||
|
if err != nil {
|
||||||
|
return n, fmt.Errorf("init payload reader for the next part: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
x.prm.Off = 0
|
||||||
|
x.prm.Ln = 0
|
||||||
|
|
||||||
|
next, err := x.Read(p[n:])
|
||||||
|
|
||||||
|
return n + next, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
|
||||||
|
// Zero range corresponds to full payload (panics if only offset is set).
|
||||||
|
func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error) {
|
||||||
|
var prmAuth handler.PrmAuth
|
||||||
|
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(p.BktInfo.CID)
|
||||||
|
addr.SetObject(p.Oid)
|
||||||
|
|
||||||
|
if p.Off+p.Ln != 0 {
|
||||||
|
prm := handler.PrmObjectRange{
|
||||||
|
PrmAuth: prmAuth,
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
Object: p.Oid,
|
||||||
|
PayloadRange: [2]uint64{p.Off, p.Ln},
|
||||||
|
Address: addr,
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.RangeObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := handler.PrmObjectGet{
|
||||||
|
PrmAuth: prmAuth,
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
Object: p.Oid,
|
||||||
|
Address: addr,
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := x.GetObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Payload, nil
|
||||||
|
}
|
137
internal/service/frostfs/multi_object_reader_test.go
Normal file
137
internal/service/frostfs/multi_object_reader_test.go
Normal file
|
@ -0,0 +1,137 @@
|
||||||
|
package frostfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type readerInitiatorMock struct {
|
||||||
|
parts map[oid.ID][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readerInitiatorMock) InitFrostFSObjectPayloadReader(_ context.Context, p GetFrostFSParams) (io.Reader, error) {
|
||||||
|
partPayload, ok := r.parts[p.Oid]
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("part not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Off+p.Ln == 0 {
|
||||||
|
return bytes.NewReader(partPayload), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Off > uint64(len(partPayload)-1) {
|
||||||
|
return nil, fmt.Errorf("invalid offset: %d/%d", p.Off, len(partPayload))
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Off+p.Ln > uint64(len(partPayload)) {
|
||||||
|
return nil, fmt.Errorf("invalid range: %d-%d/%d", p.Off, p.Off+p.Ln, len(partPayload))
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes.NewReader(partPayload[p.Off : p.Off+p.Ln]), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareDataReader() ([]byte, []PartObj, *readerInitiatorMock) {
|
||||||
|
mockInitReader := &readerInitiatorMock{
|
||||||
|
parts: map[oid.ID][]byte{
|
||||||
|
oidtest.ID(): []byte("first part 1"),
|
||||||
|
oidtest.ID(): []byte("second part 2"),
|
||||||
|
oidtest.ID(): []byte("third part 3"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var fullPayload []byte
|
||||||
|
parts := make([]PartObj, 0, len(mockInitReader.parts))
|
||||||
|
for id, payload := range mockInitReader.parts {
|
||||||
|
parts = append(parts, PartObj{OID: id, Size: uint64(len(payload))})
|
||||||
|
fullPayload = append(fullPayload, payload...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fullPayload, parts, mockInitReader
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiReader(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
fullPayload, parts, mockInitReader := prepareDataReader()
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
off uint64
|
||||||
|
ln uint64
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple read all",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "simple read with length",
|
||||||
|
ln: uint64(len(fullPayload)),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "middle of parts",
|
||||||
|
off: parts[0].Size + 2,
|
||||||
|
ln: 4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "first and second",
|
||||||
|
off: parts[0].Size - 4,
|
||||||
|
ln: 8,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "first and third",
|
||||||
|
off: parts[0].Size - 4,
|
||||||
|
ln: parts[1].Size + 8,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "second part",
|
||||||
|
off: parts[0].Size,
|
||||||
|
ln: parts[1].Size,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "second and third",
|
||||||
|
off: parts[0].Size,
|
||||||
|
ln: parts[1].Size + parts[2].Size,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "offset out of range",
|
||||||
|
off: uint64(len(fullPayload) + 1),
|
||||||
|
ln: 1,
|
||||||
|
err: errOffsetIsOutOfRange,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "zero length",
|
||||||
|
off: parts[1].Size + 1,
|
||||||
|
ln: 0,
|
||||||
|
err: errorZeroRangeLength,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
||||||
|
Initiator: mockInitReader,
|
||||||
|
Parts: parts,
|
||||||
|
Off: tc.off,
|
||||||
|
Ln: tc.ln,
|
||||||
|
})
|
||||||
|
require.ErrorIs(t, err, tc.err)
|
||||||
|
|
||||||
|
if tc.err == nil {
|
||||||
|
off := tc.off
|
||||||
|
ln := tc.ln
|
||||||
|
if off+ln == 0 {
|
||||||
|
ln = uint64(len(fullPayload))
|
||||||
|
}
|
||||||
|
data, err := io.ReadAll(multiReader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, fullPayload[off:off+ln], data)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package services
|
package frostfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -15,16 +15,16 @@ type GetNodeByPathResponseInfoWrapper struct {
|
||||||
response *grpcService.GetNodeByPathResponse_Info
|
response *grpcService.GetNodeByPathResponse_Info
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
|
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() []uint64 {
|
||||||
return n.response.GetNodeId()
|
return []uint64{n.response.GetNodeId()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
|
func (n GetNodeByPathResponseInfoWrapper) GetParentID() []uint64 {
|
||||||
return n.response.GetParentId()
|
return []uint64{n.response.GetParentId()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
|
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() []uint64 {
|
||||||
return n.response.GetTimestamp()
|
return []uint64{n.response.GetTimestamp()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
|
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
|
27
tree/tree.go
27
tree/tree.go
|
@ -73,7 +73,7 @@ type Meta interface {
|
||||||
|
|
||||||
type NodeResponse interface {
|
type NodeResponse interface {
|
||||||
GetMeta() []Meta
|
GetMeta() []Meta
|
||||||
GetTimestamp() uint64
|
GetTimestamp() []uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
||||||
|
@ -144,7 +144,7 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
latestNode, err := getLatestNode(nodes)
|
latestNode, err := getLatestVersionNode(nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -152,17 +152,20 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s
|
||||||
return newNodeVersion(latestNode)
|
return newNodeVersion(latestNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
|
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
|
||||||
var (
|
var (
|
||||||
maxCreationTime uint64
|
maxCreationTime uint64
|
||||||
targetIndexNode = -1
|
targetIndexNode = -1
|
||||||
)
|
)
|
||||||
|
|
||||||
for i, node := range nodes {
|
for i, node := range nodes {
|
||||||
currentCreationTime := node.GetTimestamp()
|
if !checkExistOID(node.GetMeta()) {
|
||||||
if checkExistOID(node.GetMeta()) && currentCreationTime > maxCreationTime {
|
continue
|
||||||
maxCreationTime = currentCreationTime
|
}
|
||||||
|
|
||||||
|
if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
|
||||||
targetIndexNode = i
|
targetIndexNode = i
|
||||||
|
maxCreationTime = currentCreationTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,3 +190,15 @@ func checkExistOID(meta []Meta) bool {
|
||||||
func pathFromName(objectName string) []string {
|
func pathFromName(objectName string) []string {
|
||||||
return strings.Split(objectName, separator)
|
return strings.Split(objectName, separator)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getMaxTimestamp(node NodeResponse) uint64 {
|
||||||
|
var maxTimestamp uint64
|
||||||
|
|
||||||
|
for _, timestamp := range node.GetTimestamp() {
|
||||||
|
if timestamp > maxTimestamp {
|
||||||
|
maxTimestamp = timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return maxTimestamp
|
||||||
|
}
|
||||||
|
|
|
@ -21,10 +21,10 @@ func (m nodeMeta) GetValue() []byte {
|
||||||
|
|
||||||
type nodeResponse struct {
|
type nodeResponse struct {
|
||||||
meta []nodeMeta
|
meta []nodeMeta
|
||||||
timestamp uint64
|
timestamp []uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n nodeResponse) GetTimestamp() uint64 {
|
func (n nodeResponse) GetTimestamp() []uint64 {
|
||||||
return n.timestamp
|
return n.timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
name: "one node of the object version",
|
name: "one node of the object version",
|
||||||
nodes: []NodeResponse{
|
nodes: []NodeResponse{
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 1,
|
timestamp: []uint64{1},
|
||||||
meta: []nodeMeta{
|
meta: []nodeMeta{
|
||||||
{
|
{
|
||||||
key: oidKV,
|
key: oidKV,
|
||||||
|
@ -67,11 +67,11 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
name: "one node of the object version and one node of the secondary object",
|
name: "one node of the object version and one node of the secondary object",
|
||||||
nodes: []NodeResponse{
|
nodes: []NodeResponse{
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 3,
|
timestamp: []uint64{3},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 1,
|
timestamp: []uint64{1},
|
||||||
meta: []nodeMeta{
|
meta: []nodeMeta{
|
||||||
{
|
{
|
||||||
key: oidKV,
|
key: oidKV,
|
||||||
|
@ -86,11 +86,11 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
name: "all nodes represent a secondary object",
|
name: "all nodes represent a secondary object",
|
||||||
nodes: []NodeResponse{
|
nodes: []NodeResponse{
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 3,
|
timestamp: []uint64{3},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 5,
|
timestamp: []uint64{5},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -100,7 +100,7 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
name: "several nodes of different types and with different timestamp",
|
name: "several nodes of different types and with different timestamp",
|
||||||
nodes: []NodeResponse{
|
nodes: []NodeResponse{
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 1,
|
timestamp: []uint64{1},
|
||||||
meta: []nodeMeta{
|
meta: []nodeMeta{
|
||||||
{
|
{
|
||||||
key: oidKV,
|
key: oidKV,
|
||||||
|
@ -109,11 +109,11 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 3,
|
timestamp: []uint64{3},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 4,
|
timestamp: []uint64{4},
|
||||||
meta: []nodeMeta{
|
meta: []nodeMeta{
|
||||||
{
|
{
|
||||||
key: oidKV,
|
key: oidKV,
|
||||||
|
@ -122,7 +122,7 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
nodeResponse{
|
nodeResponse{
|
||||||
timestamp: 6,
|
timestamp: []uint64{6},
|
||||||
meta: []nodeMeta{},
|
meta: []nodeMeta{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -130,7 +130,7 @@ func TestGetLatestNode(t *testing.T) {
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
actualNode, err := getLatestNode(tc.nodes)
|
actualNode, err := getLatestVersionNode(tc.nodes)
|
||||||
if tc.error {
|
if tc.error {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue